From 31b23f50c235807b6b5f096238e513c8fec60701 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Fri, 14 Jul 2017 20:49:36 +0200 Subject: [PATCH] Performance improved. --- .../MongoEventConsumerInfo.cs | 2 +- .../MongoEventConsumerInfoRepository.cs | 32 ++- .../EventStore/MongoEventStore.cs | 206 +++++++++++------- .../EventStore/StreamPosition.cs | 63 ++++++ .../FieldDefinitionBuilder.cs | 33 +++ .../MongoRepositoryBase.cs | 69 +----- .../CQRS/Events/IEventStore.cs | 2 +- .../Config/Domain/StoreMongoDbModule.cs | 2 +- .../ContentApi/ContentsController.cs | 1 - tests/Benchmarks/Program.cs | 11 +- .../Benchmarks/Properties/launchSettings.json | 2 +- tests/Benchmarks/Tests/AppendToEventStore.cs | 13 +- ...s => AppendToEventStoreWithManyWriters.cs} | 17 +- tests/Benchmarks/Tests/HandleEvents.cs | 81 +------ .../Tests/HandleEventsWithManyWriters.cs | 111 ++++++++++ tests/Benchmarks/Tests/TestData/MyEvent.cs | 19 ++ .../Tests/TestData/MyEventConsumer.cs | 81 +++++++ .../Contents/GraphQLTests.cs | 2 +- 18 files changed, 497 insertions(+), 250 deletions(-) rename src/Squidex.Infrastructure.MongoDb/{ => EventStore}/MongoEventConsumerInfo.cs (95%) rename src/Squidex.Infrastructure.MongoDb/{ => EventStore}/MongoEventConsumerInfoRepository.cs (59%) create mode 100644 src/Squidex.Infrastructure.MongoDb/EventStore/StreamPosition.cs create mode 100644 src/Squidex.Infrastructure.MongoDb/FieldDefinitionBuilder.cs rename tests/Benchmarks/Tests/{AppendToEventStoreParallel.cs => AppendToEventStoreWithManyWriters.cs} (81%) create mode 100644 tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs create mode 100644 tests/Benchmarks/Tests/TestData/MyEvent.cs create mode 100644 tests/Benchmarks/Tests/TestData/MyEventConsumer.cs diff --git a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventConsumerInfo.cs similarity index 95% rename from src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs rename to src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventConsumerInfo.cs index 2c7fec6d5..be37e18a7 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventConsumerInfo.cs @@ -10,7 +10,7 @@ using MongoDB.Bson; using MongoDB.Bson.Serialization.Attributes; using Squidex.Infrastructure.CQRS.Events; -namespace Squidex.Infrastructure.MongoDb +namespace Squidex.Infrastructure.MongoDb.EventStore { public sealed class MongoEventConsumerInfo : IEventConsumerInfo { diff --git a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventConsumerInfoRepository.cs similarity index 59% rename from src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs rename to src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventConsumerInfoRepository.cs index 2b359df62..666e09346 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventConsumerInfoRepository.cs @@ -16,10 +16,16 @@ using Squidex.Infrastructure.CQRS.Events; // ReSharper disable ConvertIfStatementToReturnStatement // ReSharper disable RedundantIfElseBlock -namespace Squidex.Infrastructure.MongoDb +namespace Squidex.Infrastructure.MongoDb.EventStore { public sealed class MongoEventConsumerInfoRepository : MongoRepositoryBase, IEventConsumerInfoRepository { + private static readonly FieldDefinition NameField = Fields.Build(x => x.Name); + private static readonly FieldDefinition ErrorField = Fields.Build(x => x.Error); + private static readonly FieldDefinition PositionField = Fields.Build(x => x.Position); + private static readonly FieldDefinition IsStoppedField = Fields.Build(x => x.IsStopped); + private static readonly FieldDefinition IsResettingField = Fields.Build(x => x.IsResetting); + public MongoEventConsumerInfoRepository(IMongoDatabase database) : base(database) { @@ -39,18 +45,18 @@ namespace Squidex.Infrastructure.MongoDb public async Task FindAsync(string consumerName) { - var entity = await Collection.Find(x => x.Name == consumerName).FirstOrDefaultAsync(); + var entity = await Collection.Find(Filter.Eq(NameField, consumerName)).FirstOrDefaultAsync(); return entity; } public async Task CreateAsync(string consumerName) { - if (await Collection.CountAsync(x => x.Name == consumerName) == 0) + if (await Collection.CountAsync(Filter.Eq(NameField, consumerName)) == 0) { try { - await Collection.InsertOneAsync(new MongoEventConsumerInfo { Name = consumerName, Position = null }); + await Collection.InsertOneAsync(CreateEntity(consumerName, null)); } catch (MongoWriteException ex) { @@ -64,28 +70,36 @@ namespace Squidex.Infrastructure.MongoDb public Task StartAsync(string consumerName) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Unset(x => x.IsStopped)); + var filter = Filter.Eq(NameField, consumerName); + + return Collection.UpdateOneAsync(filter, Update.Unset(IsStoppedField)); } public Task StopAsync(string consumerName, string error = null) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsStopped, true).Set(x => x.Error, error)); + var filter = Filter.Eq(NameField, consumerName); + + return Collection.UpdateOneAsync(filter, Update.Set(IsStoppedField, true).Set(ErrorField, error)); } public Task ResetAsync(string consumerName) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsResetting, true)); + var filter = Filter.Eq(NameField, consumerName); + + return Collection.UpdateOneAsync(filter, Update.Set(IsResettingField, true)); } public Task SetPositionAsync(string consumerName, string position, bool reset) { + var filter = Filter.Eq(NameField, consumerName); + if (reset) { - return Collection.ReplaceOneAsync(x => x.Name == consumerName, CreateEntity(consumerName, position)); + return Collection.ReplaceOneAsync(filter, CreateEntity(consumerName, position)); } else { - return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.Position, position)); + return Collection.UpdateOneAsync(filter, Update.Set(PositionField, position)); } } diff --git a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs index f9ab7dec2..df6ea890b 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs @@ -6,15 +6,22 @@ // All rights reserved. // ========================================================================== +using MongoDB.Bson; +using MongoDB.Driver; +using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.Timers; using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; -using MongoDB.Bson; -using MongoDB.Driver; -using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.Tasks; +// ReSharper disable PossibleInvalidOperationException +// ReSharper disable EmptyGeneralCatchClause +// ReSharper disable AccessToModifiedClosure // ReSharper disable RedundantAssignment // ReSharper disable InvertIf // ReSharper disable ConvertIfStatementToConditionalTernaryExpression @@ -22,17 +29,32 @@ using Squidex.Infrastructure.CQRS.Events; namespace Squidex.Infrastructure.MongoDb.EventStore { - public class MongoEventStore : MongoRepositoryBase, IEventStore + public class MongoEventStore : MongoRepositoryBase, IEventStore, IDisposable { - private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); + private static readonly FieldDefinition TimestampField = Fields.Build(x => x.Timestamp); + private static readonly FieldDefinition EventsCountField = Fields.Build(x => x.EventsCount); + private static readonly FieldDefinition EventStreamOffsetField = Fields.Build(x => x.EventStreamOffset); + private static readonly FieldDefinition EventStreamField = Fields.Build(x => x.EventStream); private readonly IEventNotifier notifier; + private readonly CompletionTimer timer; + private readonly ConcurrentQueue<(BsonDocument Document, TaskCompletionSource Completion)> pendingCommits = new ConcurrentQueue<(BsonDocument Document, TaskCompletionSource Completion)>(); + private readonly Lazy> plainCollection; - public MongoEventStore(IMongoDatabase database, IEventNotifier notifier) + public MongoEventStore(IMongoDatabase database, IEventNotifier notifier) : base(database) { Guard.NotNull(notifier, nameof(notifier)); this.notifier = notifier; + + timer = new CompletionTimer(50, ct => WriteAsync()); + + plainCollection = new Lazy>(() => Database.GetCollection(CollectionName())); + } + + public void Dispose() + { + timer.Dispose(); } protected override string CollectionName() @@ -45,10 +67,9 @@ namespace Squidex.Infrastructure.MongoDb.EventStore return new MongoCollectionSettings { ReadPreference = ReadPreference.Primary, WriteConcern = WriteConcern.WMajority }; } - protected override async Task SetupCollectionAsync(IMongoCollection collection) + protected override Task SetupCollectionAsync(IMongoCollection collection) { - await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream)); - await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }); + return collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }); } public IObservable GetEventsAsync(string streamFilter = null, string position = null) @@ -59,7 +80,7 @@ namespace Squidex.Infrastructure.MongoDb.EventStore { observer.OnNext(storedEvent); - return Tasks.TaskHelper.Done; + return TaskHelper.Done; }, ct, streamFilter, position); }); } @@ -68,54 +89,96 @@ namespace Squidex.Infrastructure.MongoDb.EventStore { Guard.NotNull(callback, nameof(callback)); - var tokenTimestamp = EmptyTimestamp; - var tokenCommitSize = -1; - var tokenCommitOffset = -1; - var isEndOfCommit = false; - - if (position != null) - { - var token = ParsePosition(position); - - tokenTimestamp = token.Timestamp; - tokenCommitSize = token.CommitSize; - tokenCommitOffset = token.CommitOffset; - - isEndOfCommit = tokenCommitOffset == tokenCommitSize - 1; + StreamPosition lastPosition = position; - if (isEndOfCommit) - { - tokenCommitOffset = -1; - } - } + var wasEndOfCommit = lastPosition.IsEndOfCommit; - var filter = CreateFilter(streamFilter, isEndOfCommit, tokenTimestamp); + var filter = CreateFilter(streamFilter, lastPosition); - await Collection.Find(filter).SortBy(x => x.Timestamp).ForEachAsync(async commit => + await Collection.Find(filter).Sort(Sort.Ascending(EventStreamField)).ForEachAsync(async commit => { - var eventStreamNumber = (int)commit.EventStreamOffset; + var eventStreamOffset = (int)commit.EventStreamOffset; + var commitTimestamp = commit.Timestamp; var commitOffset = 0; foreach (var e in commit.Events) { - eventStreamNumber++; + eventStreamOffset++; - if (commitOffset > tokenCommitOffset) + if (commitOffset > lastPosition.CommitOffset || wasEndOfCommit) { var eventData = new EventData { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type }; - var eventToken = CreateToken(commit.Timestamp, commitOffset, commit.Events.Length); + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); - await callback(new StoredEvent(eventToken, eventStreamNumber, eventData)); + await callback(new StoredEvent(eventToken, eventStreamOffset, eventData)); commitOffset++; } - else + } + }, cancellationToken); + } + + private async Task WriteAsync() + { + while (true) + { + var commitsToInsert = new List<(BsonDocument Document, TaskCompletionSource Completion)>(); + + while (pendingCommits.TryDequeue(out var commit)) + { + commitsToInsert.Add(commit); + } + + var numCommits = commitsToInsert.Count; + + if (numCommits == 0) + { + return; + } + + try + { + await plainCollection.Value.InsertManyAsync(commitsToInsert.Select(x => x.Document), new InsertManyOptions { IsOrdered = false }); + + notifier.NotifyEventsStored(); + + foreach (var commit in commitsToInsert) + { + commit.Completion.SetResult(true); + } + } + catch (MongoBulkWriteException ex) + { + foreach (var error in ex.WriteErrors) { - break; + var commit = commitsToInsert[error.Index]; + + if (error.Category == ServerErrorCategory.DuplicateKey) + { + var streamName = commit.Document[nameof(MongoEventCommit.EventStream)].AsString; + var streamOffset = commit.Document[nameof(MongoEventCommit.EventStreamOffset)].AsInt64; + + var currentVersion = await GetEventStreamOffset(streamName); + + var exception = new WrongEventVersionException(currentVersion, streamOffset); + + commit.Completion.SetException(exception); + } + else + { + commit.Completion.SetException(new MongoWriteException(ex.ConnectionId, error, ex.WriteConcernError, ex)); + } } } - }, cancellationToken); + catch (Exception ex) + { + foreach (var commit in commitsToInsert) + { + commit.Completion.SetException(ex); + } + } + } } public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events) @@ -138,76 +201,65 @@ namespace Squidex.Infrastructure.MongoDb.EventStore commitEvents[i++] = mongoEvent; } - var commit = new MongoEventCommit + var cts = new TaskCompletionSource(); + + var document = new MongoEventCommit { Id = commitId, Events = commitEvents, EventsCount = eventsCount, EventStream = streamName, - EventStreamOffset = expectedVersion, - Timestamp = EmptyTimestamp - }; - - try - { - await Collection.InsertOneAsync(commit); + EventStreamOffset = expectedVersion + }.ToBsonDocument(); - notifier.NotifyEventsStored(); - } - catch (MongoWriteException ex) - { - if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) - { - var currentVersion = await GetEventStreamOffset(streamName); + pendingCommits.Enqueue((document, cts)); - throw new WrongEventVersionException(currentVersion, expectedVersion); - } + timer.Trigger(); - throw; - } + await cts.Task; } } private async Task GetEventStreamOffset(string streamName) { var document = - await Collection.Find(x => x.EventStream == streamName) + await Collection.Find(Filter.Eq(EventStreamField, streamName)) .Project(Project - .Include(x => x.EventStreamOffset) - .Include(x => x.EventsCount)) - .SortByDescending(x => x.EventStreamOffset).Limit(1) + .Include(EventStreamOffsetField) + .Include(EventsCountField)) + .Sort(Sort.Descending(EventStreamOffsetField)).Limit(1) .FirstOrDefaultAsync(); if (document != null) { - return document["EventStreamOffset"].ToInt64() + document["EventsCount"].ToInt64(); + return document[nameof(MongoEventCommit.EventStreamOffset)].ToInt64() + document[nameof(MongoEventCommit.EventsCount)].ToInt64(); } return -1; } - private static FilterDefinition CreateFilter(string streamFilter, bool isEndOfCommit, BsonTimestamp tokenTimestamp) + private static FilterDefinition CreateFilter(string streamFilter, StreamPosition streamPosition) { var filters = new List>(); - if (isEndOfCommit) + if (streamPosition.IsEndOfCommit) { - filters.Add(Filter.Gt(x => x.Timestamp, tokenTimestamp)); + filters.Add(Filter.Gt(TimestampField, streamPosition.Timestamp)); } else { - filters.Add(Filter.Gte(x => x.Timestamp, tokenTimestamp)); + filters.Add(Filter.Gte(TimestampField, streamPosition.Timestamp)); } if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, "*", StringComparison.OrdinalIgnoreCase)) { if (streamFilter.Contains("^")) { - filters.Add(Filter.Regex(x => x.EventStream, streamFilter)); + filters.Add(Filter.Regex(EventStreamField, streamFilter)); } else { - filters.Add(Filter.Eq(x => x.EventStream, streamFilter)); + filters.Add(Filter.Eq(EventStreamField, streamFilter)); } } @@ -224,19 +276,5 @@ namespace Squidex.Infrastructure.MongoDb.EventStore return filter; } - - private static string CreateToken(BsonTimestamp timestamp, int commitOffset, int commitSize) - { - var parts = new object[] { timestamp.Timestamp, timestamp.Increment, commitOffset, commitSize }; - - return string.Join("-", parts); - } - - private static (BsonTimestamp Timestamp, int CommitOffset, int CommitSize) ParsePosition(string position) - { - var parts = position.Split('-'); - - return (new BsonTimestamp(int.Parse(parts[0]), int.Parse(parts[1])), int.Parse(parts[2]), int.Parse(parts[3])); - } } -} +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure.MongoDb/EventStore/StreamPosition.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/StreamPosition.cs new file mode 100644 index 000000000..affaf81f6 --- /dev/null +++ b/src/Squidex.Infrastructure.MongoDb/EventStore/StreamPosition.cs @@ -0,0 +1,63 @@ +// ========================================================================== +// StreamPosition.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +// ReSharper disable InvertIf + +using MongoDB.Bson; + +namespace Squidex.Infrastructure.MongoDb.EventStore +{ + public sealed class StreamPosition + { + private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); + + public BsonTimestamp Timestamp { get; } + + public long CommitOffset { get; } + + public long CommitSize { get; } + + public bool IsEndOfCommit + { + get { return CommitOffset == CommitSize - 1; } + } + + public StreamPosition(BsonTimestamp timestamp, long commitOffset, long commitSize) + { + Timestamp = timestamp; + + CommitOffset = commitOffset; + CommitSize = commitSize; + } + + public static implicit operator string(StreamPosition position) + { + var parts = new object[] + { + position.Timestamp.Timestamp, + position.Timestamp.Increment, + position.CommitOffset, + position.CommitSize + }; + + return string.Join("-", parts); + } + + public static implicit operator StreamPosition(string position) + { + if (!string.IsNullOrWhiteSpace(position)) + { + var parts = position.Split('-'); + + return new StreamPosition(new BsonTimestamp(int.Parse(parts[0]), int.Parse(parts[1])), long.Parse(parts[2]), long.Parse(parts[3])); + } + + return new StreamPosition(EmptyTimestamp, -1, -1); + } + } +} diff --git a/src/Squidex.Infrastructure.MongoDb/FieldDefinitionBuilder.cs b/src/Squidex.Infrastructure.MongoDb/FieldDefinitionBuilder.cs new file mode 100644 index 000000000..89fd15d3e --- /dev/null +++ b/src/Squidex.Infrastructure.MongoDb/FieldDefinitionBuilder.cs @@ -0,0 +1,33 @@ +// ========================================================================== +// FieldDefinitionBuilder.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Linq.Expressions; +using MongoDB.Driver; + +namespace Squidex.Infrastructure.MongoDb +{ + public sealed class FieldDefinitionBuilder + { + public static readonly FieldDefinitionBuilder Instance = new FieldDefinitionBuilder(); + + private FieldDefinitionBuilder() + { + } + + public FieldDefinition Build(Expression> expression) + { + return new ExpressionFieldDefinition(expression); + } + + public FieldDefinition Build(string name) + { + return new StringFieldDefinition(name); + } + } +} diff --git a/src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs b/src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs index b4706cc76..7d5815ca2 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs @@ -18,72 +18,25 @@ namespace Squidex.Infrastructure.MongoDb public abstract class MongoRepositoryBase : IExternalSystem { private const string CollectionFormat = "{0}Set"; - private Lazy> mongoCollection; - private readonly IMongoDatabase mongoDatabase; - private readonly string typeName; - protected string TypeName - { - get - { - return typeName; - } - } + protected static readonly SortDefinitionBuilder Sort = Builders.Sort; + protected static readonly UpdateDefinitionBuilder Update = Builders.Update; + protected static readonly FieldDefinitionBuilder Fields = FieldDefinitionBuilder.Instance; + protected static readonly FilterDefinitionBuilder Filter = Builders.Filter; + protected static readonly IndexKeysDefinitionBuilder Index = Builders.IndexKeys; + protected static readonly ProjectionDefinitionBuilder Project = Builders.Projection; - protected static ProjectionDefinitionBuilder Project - { - get - { - return Builders.Projection; - } - } - - protected static SortDefinitionBuilder Sort - { - get - { - return Builders.Sort; - } - } - - protected static UpdateDefinitionBuilder Update - { - get - { - return Builders.Update; - } - } - - protected static FilterDefinitionBuilder Filter - { - get - { - return Builders.Filter; - } - } - - protected static IndexKeysDefinitionBuilder Index - { - get - { - return Builders.IndexKeys; - } - } + private Lazy> mongoCollection; + private readonly IMongoDatabase mongoDatabase; protected IMongoCollection Collection { - get - { - return mongoCollection.Value; - } + get { return mongoCollection.Value; } } protected IMongoDatabase Database { - get - { - return mongoDatabase; - } + get { return mongoDatabase; } } static MongoRepositoryBase() @@ -98,8 +51,6 @@ namespace Squidex.Infrastructure.MongoDb mongoDatabase = database; mongoCollection = CreateCollection(); - - typeName = GetType().Name; } protected virtual MongoCollectionSettings CollectionSettings() diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs index f3f6491eb..38ebc5478 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs @@ -17,7 +17,7 @@ namespace Squidex.Infrastructure.CQRS.Events { IObservable GetEventsAsync(string streamFilter = null, string position = null); - Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null); + Task GetEventsAsync(Func callback, CancellationToken cancellationToken = default(CancellationToken), string streamFilter = null, string position = null); Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events); } diff --git a/src/Squidex/Config/Domain/StoreMongoDbModule.cs b/src/Squidex/Config/Domain/StoreMongoDbModule.cs index 07015185a..bd119b2e6 100644 --- a/src/Squidex/Config/Domain/StoreMongoDbModule.cs +++ b/src/Squidex/Config/Domain/StoreMongoDbModule.cs @@ -30,7 +30,7 @@ using Squidex.Domain.Users.MongoDb; using Squidex.Domain.Users.MongoDb.Infrastructure; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.MongoDb; +using Squidex.Infrastructure.MongoDb.EventStore; using Squidex.Infrastructure.MongoDb.UsageTracker; using Squidex.Infrastructure.UsageTracking; using Squidex.Shared.Users; diff --git a/src/Squidex/Controllers/ContentApi/ContentsController.cs b/src/Squidex/Controllers/ContentApi/ContentsController.cs index 5a4b90320..9c721835a 100644 --- a/src/Squidex/Controllers/ContentApi/ContentsController.cs +++ b/src/Squidex/Controllers/ContentApi/ContentsController.cs @@ -18,7 +18,6 @@ using Squidex.Domain.Apps.Core.Contents; using Squidex.Domain.Apps.Read.Contents.GraphQL; using Squidex.Domain.Apps.Read.Contents.Repositories; using Squidex.Domain.Apps.Read.Schemas; -using Squidex.Domain.Apps.Read.Schemas.Repositories; using Squidex.Domain.Apps.Read.Schemas.Services; using Squidex.Domain.Apps.Write.Contents.Commands; using Squidex.Infrastructure.CQRS.Commands; diff --git a/tests/Benchmarks/Program.cs b/tests/Benchmarks/Program.cs index bb6fb6289..d13f82b6b 100644 --- a/tests/Benchmarks/Program.cs +++ b/tests/Benchmarks/Program.cs @@ -19,8 +19,9 @@ namespace Benchmarks private static readonly List Benchmarks = new List { new AppendToEventStore(), - new AppendToEventStoreParallel(), - new HandleEvents() + new AppendToEventStoreWithManyWriters(), + new HandleEvents(), + new HandleEventsWithManyWriters() }; public static void Main(string[] args) @@ -49,6 +50,8 @@ namespace Benchmarks var elapsed = 0d; var count = 0L; + Console.WriteLine($"{benchmark.Name}: Initialized"); + benchmark.Initialize(); for (var run = 0; run < numRuns; run++) @@ -64,6 +67,8 @@ namespace Benchmarks watch.Stop(); elapsed += watch.ElapsedMilliseconds; + + Console.WriteLine($"{benchmark.Name}: Run {run + 1} finished"); } finally { @@ -74,7 +79,7 @@ namespace Benchmarks var averageElapsed = TimeSpan.FromMilliseconds(elapsed / numRuns); var averageSeconds = Math.Round(count / (numRuns * averageElapsed.TotalSeconds), 2); - Console.WriteLine($"{benchmark.Name} completed after {averageElapsed}, {averageSeconds} items/s"); + Console.WriteLine($"{benchmark.Name}: Completed after {averageElapsed}, {averageSeconds} items/s"); } catch (Exception e) { diff --git a/tests/Benchmarks/Properties/launchSettings.json b/tests/Benchmarks/Properties/launchSettings.json index 701455407..5d43f5fe1 100644 --- a/tests/Benchmarks/Properties/launchSettings.json +++ b/tests/Benchmarks/Properties/launchSettings.json @@ -2,7 +2,7 @@ "profiles": { "Benchmarks": { "commandName": "Project", - "commandLineArgs": "handleEvents" + "commandLineArgs": "appendToEventStoreParallel" } } } \ No newline at end of file diff --git a/tests/Benchmarks/Tests/AppendToEventStore.cs b/tests/Benchmarks/Tests/AppendToEventStore.cs index 6e5618ea1..f5ee53e47 100644 --- a/tests/Benchmarks/Tests/AppendToEventStore.cs +++ b/tests/Benchmarks/Tests/AppendToEventStore.cs @@ -12,6 +12,7 @@ using MongoDB.Driver; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.MongoDb.EventStore; +using Squidex.Infrastructure.Tasks; namespace Benchmarks.Tests { @@ -28,7 +29,7 @@ namespace Benchmarks.Tests public string Name { - get { return "Append Events to EventStore"; } + get { return "Append events"; } } public void Initialize() @@ -41,14 +42,15 @@ namespace Benchmarks.Tests mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString()); eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub())); + eventStore.GetEventsAsync(x => TaskHelper.Done).Wait(); } public long Run() { - const long numCommits = 200; - const long eventStreams = 10; + const long numCommits = 100; + const long numStreams = 20; - for (var streamId = 0; streamId < eventStreams; streamId++) + for (var streamId = 0; streamId < numStreams; streamId++) { var eventOffset = -1; var streamName = streamId.ToString(); @@ -56,12 +58,11 @@ namespace Benchmarks.Tests for (var commitId = 0; commitId < numCommits; commitId++) { eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { Helper.CreateEventData() }).Wait(); - eventOffset++; } } - return numCommits * eventStreams; + return numCommits * numStreams; } public void RunCleanup() diff --git a/tests/Benchmarks/Tests/AppendToEventStoreParallel.cs b/tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs similarity index 81% rename from tests/Benchmarks/Tests/AppendToEventStoreParallel.cs rename to tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs index 59cb09880..1a62c3eba 100644 --- a/tests/Benchmarks/Tests/AppendToEventStoreParallel.cs +++ b/tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs @@ -1,5 +1,5 @@ // ========================================================================== -// AppendToEventStoreParallel.cs +// AppendToEventStoreWithManyWriters.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -13,10 +13,11 @@ using MongoDB.Driver; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.MongoDb.EventStore; +using Squidex.Infrastructure.Tasks; namespace Benchmarks.Tests { - public sealed class AppendToEventStoreParallel : IBenchmark + public sealed class AppendToEventStoreWithManyWriters : IBenchmark { private IMongoClient mongoClient; private IMongoDatabase mongoDatabase; @@ -29,7 +30,7 @@ namespace Benchmarks.Tests public string Name { - get { return "Append Events to EventStore Parallel"; } + get { return "Append events parallel"; } } public void Initialize() @@ -42,14 +43,15 @@ namespace Benchmarks.Tests mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString()); eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub())); + eventStore.GetEventsAsync(x => TaskHelper.Done).Wait(); } public long Run() { const long numCommits = 200; - const long eventStreams = 10; + const long numStreams = 100; - Parallel.For(0, eventStreams, streamId => + Parallel.For(0, numStreams, streamId => { var eventOffset = -1; var streamName = streamId.ToString(); @@ -57,12 +59,11 @@ namespace Benchmarks.Tests for (var commitId = 0; commitId < numCommits; commitId++) { eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { Helper.CreateEventData() }).Wait(); - eventOffset++; } }); - - return numCommits * eventStreams; + + return numCommits * numStreams; } public void RunCleanup() diff --git a/tests/Benchmarks/Tests/HandleEvents.cs b/tests/Benchmarks/Tests/HandleEvents.cs index 6c5289571..70794c198 100644 --- a/tests/Benchmarks/Tests/HandleEvents.cs +++ b/tests/Benchmarks/Tests/HandleEvents.cs @@ -7,15 +7,13 @@ // ========================================================================== using System; -using System.Collections.Generic; -using System.Threading.Tasks; +using Benchmarks.Tests.TestData; using MongoDB.Driver; using Newtonsoft.Json; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.Json; using Squidex.Infrastructure.Log; -using Squidex.Infrastructure.MongoDb; using Squidex.Infrastructure.MongoDb.EventStore; using Squidex.Infrastructure.Tasks; @@ -25,60 +23,6 @@ namespace Benchmarks.Tests { public sealed class HandleEvents : IBenchmark { - [TypeName("MyEvent")] - public sealed class MyEvent : IEvent - { - public int EventNumber { get; set; } - } - - public sealed class MyEventConsumer : IEventConsumer - { - private readonly TaskCompletionSource completion = new TaskCompletionSource(); - private readonly int numEvents; - - public List EventNumbers { get; } = new List(); - - public string Name - { - get { return typeof(MyEventConsumer).Name; } - } - - public string EventsFilter - { - get { return string.Empty; } - } - - public MyEventConsumer(int numEvents) - { - this.numEvents = numEvents; - } - - public Task ClearAsync() - { - return TaskHelper.Done; - } - - public void Wait() - { - completion.Task.Wait(); - } - - public Task On(Envelope @event) - { - if (@event.Payload is MyEvent myEvent) - { - EventNumbers.Add(myEvent.EventNumber); - - if (myEvent.EventNumber == numEvents) - { - completion.SetResult(true); - } - } - - return TaskHelper.Done; - } - } - private readonly TypeNameRegistry typeNameRegistry = new TypeNameRegistry().Map(typeof(MyEvent)); private readonly EventDataFormatter formatter; private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings(); @@ -98,7 +42,7 @@ namespace Benchmarks.Tests public string Name { - get { return "HandleEvents"; } + get { return "Handle Events"; } } public HandleEvents() @@ -120,9 +64,11 @@ namespace Benchmarks.Tests var log = new SemanticLog(new ILogChannel[0], new ILogAppender[0], () => new JsonLogWriter(Formatting.Indented, true)); eventConsumerInfos = new MongoEventConsumerInfoRepository(mongoDatabase); + eventConsumer = new MyEventConsumer(NumEvents); eventNotifier = new DefaultEventNotifier(new InMemoryPubSub()); + eventStore = new MongoEventStore(mongoDatabase, eventNotifier); - eventConsumer = new MyEventConsumer(NumEvents); + eventStore.GetEventsAsync(x => TaskHelper.Done).Wait(); eventReceiver = new EventReceiver(formatter, eventStore, eventNotifier, eventConsumerInfos, log); eventReceiver.Subscribe(eventConsumer); @@ -139,22 +85,7 @@ namespace Benchmarks.Tests eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventId - 1, new [] { eventData }).Wait(); } - eventConsumer.Wait(); - - if (eventConsumer.EventNumbers.Count != NumEvents) - { - throw new InvalidOperationException($"{eventConsumer.EventNumbers.Count} Events have been handled"); - } - - for (var i = 0; i < eventConsumer.EventNumbers.Count; i++) - { - var value = eventConsumer.EventNumbers[i]; - - if (value != i + 1) - { - throw new InvalidOperationException($"Event[{i}] != value"); - } - } + eventConsumer.WaitAndVerify(); return NumEvents; } diff --git a/tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs b/tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs new file mode 100644 index 000000000..e71364e91 --- /dev/null +++ b/tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs @@ -0,0 +1,111 @@ +// ========================================================================== +// HandleEventsWithManyWriters.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using Benchmarks.Tests.TestData; +using MongoDB.Driver; +using Newtonsoft.Json; +using Squidex.Infrastructure; +using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.Json; +using Squidex.Infrastructure.Log; +using Squidex.Infrastructure.MongoDb.EventStore; +using Squidex.Infrastructure.Tasks; + +// ReSharper disable InvertIf + +namespace Benchmarks.Tests +{ + public sealed class HandleEventsWithManyWriters : IBenchmark + { + private readonly TypeNameRegistry typeNameRegistry = new TypeNameRegistry().Map(typeof(MyEvent)); + private readonly EventDataFormatter formatter; + private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings(); + private const int NumCommits = 200; + private const int NumStreams = 10; + private IMongoClient mongoClient; + private IMongoDatabase mongoDatabase; + private IEventStore eventStore; + private IEventNotifier eventNotifier; + private IEventConsumerInfoRepository eventConsumerInfos; + private EventReceiver eventReceiver; + private MyEventConsumer eventConsumer; + + public string Id + { + get { return "handleEventsParallel"; } + } + + public string Name + { + get { return "Handle events parallel"; } + } + + public HandleEventsWithManyWriters() + { + serializerSettings.Converters.Add(new PropertiesBagConverter()); + + formatter = new EventDataFormatter(typeNameRegistry, serializerSettings); + } + + public void Initialize() + { + mongoClient = new MongoClient("mongodb://localhost"); + } + + public void RunInitialize() + { + mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString()); + + var log = new SemanticLog(new ILogChannel[0], new ILogAppender[0], () => new JsonLogWriter(Formatting.Indented, true)); + + eventConsumerInfos = new MongoEventConsumerInfoRepository(mongoDatabase); + eventConsumer = new MyEventConsumer(NumStreams * NumCommits); + eventNotifier = new DefaultEventNotifier(new InMemoryPubSub()); + + eventStore = new MongoEventStore(mongoDatabase, eventNotifier); + eventStore.GetEventsAsync(x => TaskHelper.Done).Wait(); + + eventReceiver = new EventReceiver(formatter, eventStore, eventNotifier, eventConsumerInfos, log); + eventReceiver.Subscribe(eventConsumer); + } + + public long Run() + { + Parallel.For(0, NumStreams, streamId => + { + var eventOffset = -1; + var streamName = streamId.ToString(); + + for (var commitId = 0; commitId < NumCommits; commitId++) + { + var eventData = formatter.ToEventData(new Envelope(new MyEvent()), Guid.NewGuid()); + + eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset - 1, new[] { eventData }).Wait(); + eventOffset++; + } + }); + + eventConsumer.WaitAndVerify(); + + return NumStreams * NumCommits; + } + + public void RunCleanup() + { + mongoClient.DropDatabase(mongoDatabase.DatabaseNamespace.DatabaseName); + + eventReceiver.Dispose(); + } + + public void Cleanup() + { + } + } +} diff --git a/tests/Benchmarks/Tests/TestData/MyEvent.cs b/tests/Benchmarks/Tests/TestData/MyEvent.cs new file mode 100644 index 000000000..6bfccb29d --- /dev/null +++ b/tests/Benchmarks/Tests/TestData/MyEvent.cs @@ -0,0 +1,19 @@ +// ========================================================================== +// MyEvent.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using Squidex.Infrastructure; +using Squidex.Infrastructure.CQRS.Events; + +namespace Benchmarks.Tests.TestData +{ + [TypeName("MyEvent")] + public sealed class MyEvent : IEvent + { + public int EventNumber { get; set; } + } +} \ No newline at end of file diff --git a/tests/Benchmarks/Tests/TestData/MyEventConsumer.cs b/tests/Benchmarks/Tests/TestData/MyEventConsumer.cs new file mode 100644 index 000000000..a8f3fb78c --- /dev/null +++ b/tests/Benchmarks/Tests/TestData/MyEventConsumer.cs @@ -0,0 +1,81 @@ +// ========================================================================== +// MyEventConsumer.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.Tasks; + +// ReSharper disable InvertIf + +namespace Benchmarks.Tests.TestData +{ + public sealed class MyEventConsumer : IEventConsumer + { + private readonly TaskCompletionSource completion = new TaskCompletionSource(); + private readonly int numEvents; + + public List EventNumbers { get; } = new List(); + + public string Name + { + get { return typeof(MyEventConsumer).Name; } + } + + public string EventsFilter + { + get { return string.Empty; } + } + + public MyEventConsumer(int numEvents) + { + this.numEvents = numEvents; + } + + public Task ClearAsync() + { + return TaskHelper.Done; + } + + public void WaitAndVerify() + { + completion.Task.Wait(); + + if (EventNumbers.Count != numEvents) + { + throw new InvalidOperationException($"{EventNumbers.Count} Events have been handled"); + } + + for (var i = 0; i < EventNumbers.Count; i++) + { + var value = EventNumbers[i]; + + if (value != i + 1) + { + throw new InvalidOperationException($"Event[{i}] != value"); + } + } + } + + public Task On(Envelope @event) + { + if (@event.Payload is MyEvent myEvent) + { + EventNumbers.Add(myEvent.EventNumber); + + if (myEvent.EventNumber == numEvents) + { + completion.SetResult(true); + } + } + + return TaskHelper.Done; + } + } +} \ No newline at end of file diff --git a/tests/Squidex.Domain.Apps.Read.Tests/Contents/GraphQLTests.cs b/tests/Squidex.Domain.Apps.Read.Tests/Contents/GraphQLTests.cs index bb7fc7132..445b34fe5 100644 --- a/tests/Squidex.Domain.Apps.Read.Tests/Contents/GraphQLTests.cs +++ b/tests/Squidex.Domain.Apps.Read.Tests/Contents/GraphQLTests.cs @@ -310,7 +310,7 @@ namespace Squidex.Domain.Apps.Read.Contents contentRepository.VerifyAll(); } - [Fact]] + [Fact] public async Task Should_return_single_content_when_finding_content() { var contentId = Guid.NewGuid();