From d90e5efc87d7a2aee8fcd636539fccb7a1fe5915 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Tue, 8 Aug 2017 18:15:45 +0200 Subject: [PATCH 1/3] Test --- .../CQRS/Events/GetEventStore.cs | 16 +++- .../CQRS/Events/MongoEventStore.cs | 94 ++++++++++++++----- .../CQRS/Events/IEventStore.cs | 2 + .../AppendToEventStoreWithManyWriters.cs | 4 +- tests/Benchmarks/Utils/Helper.cs | 2 +- .../CQRS/Events/EventReceiverTests.cs | 5 + 6 files changed, 92 insertions(+), 31 deletions(-) diff --git a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs index 7bce0375b..68dcdddeb 100644 --- a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs +++ b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs @@ -80,10 +80,22 @@ namespace Squidex.Infrastructure.CQRS.Events return result; } - public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events) + public Task AppendEventsAsync(Guid commitId, string streamName, ICollection events) + { + return AppendEventsInternalAsync(streamName, ExpectedVersion.Any, events); + } + + public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events) + { + Guard.GreaterEquals(expectedVersion, -1, nameof(expectedVersion)); + + return AppendEventsInternalAsync(streamName, expectedVersion, events); + } + + private async Task AppendEventsInternalAsync(string streamName, long expectedVersion, ICollection events) { - Guard.NotNull(events, nameof(events)); Guard.NotNullOrEmpty(streamName, nameof(streamName)); + Guard.NotNull(events, nameof(events)); if (events.Count == 0) { diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs index c67ce70cc..3cdbf6dc3 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs @@ -25,6 +25,8 @@ namespace Squidex.Infrastructure.CQRS.Events { public class MongoEventStore : MongoRepositoryBase, IEventStore { + private const long AnyVersion = long.MinValue; + private const int AppendTimeoutMs = 1000000; private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); private static readonly FieldDefinition TimestampField = Fields.Build(x => x.Timestamp); private static readonly FieldDefinition EventsCountField = Fields.Build(x => x.EventsCount); @@ -107,39 +109,44 @@ namespace Squidex.Infrastructure.CQRS.Events }, cancellationToken); } - public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events) + public Task AppendEventsAsync(Guid commitId, string streamName, ICollection events) + { + return AppendEventsInternalAsync(commitId, streamName, AnyVersion, events); + } + + public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events) + { + Guard.GreaterEquals(expectedVersion, -1, nameof(expectedVersion)); + + return AppendEventsInternalAsync(commitId, streamName, expectedVersion, events); + } + + private async Task AppendEventsInternalAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) { Guard.NotNullOrEmpty(streamName, nameof(streamName)); Guard.NotNull(events, nameof(events)); - var eventsCount = events.Count; - - if (eventsCount > 0) + if (events.Count == 0) { - var commitEvents = new MongoEvent[events.Count]; + return; + } - var i = 0; + var currentVersion = await GetEventStreamOffset(streamName); - foreach (var e in events) - { - var mongoEvent = new MongoEvent { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type }; + if (expectedVersion != AnyVersion && expectedVersion != currentVersion) + { + throw new WrongEventVersionException(currentVersion, expectedVersion); + } - commitEvents[i++] = mongoEvent; - } + var cts = new CancellationTokenSource(AppendTimeoutMs); + + var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); + while (!cts.IsCancellationRequested) + { try { - var document = new MongoEventCommit - { - Id = commitId, - Events = commitEvents, - EventsCount = eventsCount, - EventStream = streamName, - EventStreamOffset = expectedVersion, - Timestamp = EmptyTimestamp - }; - - await Collection.InsertOneAsync(document); + await Collection.InsertOneAsync(commit, new InsertOneOptions(), cts.Token); notifier.NotifyEventsStored(); } @@ -147,9 +154,20 @@ namespace Squidex.Infrastructure.CQRS.Events { if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) { - var currentVersion = await GetEventStreamOffset(streamName); - - throw new WrongEventVersionException(currentVersion, expectedVersion); + currentVersion = await GetEventStreamOffset(streamName); + + if (expectedVersion != AnyVersion) + { + throw new WrongEventVersionException(currentVersion, expectedVersion); + } + else if (!cts.IsCancellationRequested) + { + commit.EventStreamOffset = currentVersion; + } + else + { + throw new TaskCanceledException("Could not acquire a free slot for the commit within the provided time."); + } } else { @@ -204,5 +222,31 @@ namespace Squidex.Infrastructure.CQRS.Events return Filter.And(filters); } + + private static MongoEventCommit BuildCommit(Guid commitId, string streamName, long expectedVersion, ICollection events) + { + var commitEvents = new MongoEvent[events.Count]; + + var i = 0; + + foreach (var e in events) + { + var mongoEvent = new MongoEvent { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type }; + + commitEvents[i++] = mongoEvent; + } + + var mongoCommit = new MongoEventCommit + { + Id = commitId, + Events = commitEvents, + EventsCount = events.Count, + EventStream = streamName, + EventStreamOffset = expectedVersion, + Timestamp = EmptyTimestamp + }; + + return mongoCommit; + } } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs index 8a4d10b15..ae6b265d4 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs @@ -16,6 +16,8 @@ namespace Squidex.Infrastructure.CQRS.Events { Task> GetEventsAsync(string streamName); + Task AppendEventsAsync(Guid commitId, string streamName, ICollection events); + Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events); IEventSubscription CreateSubscription(string streamFilter = null, string position = null); diff --git a/tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs b/tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs index 2f1bf6599..d76684f3d 100644 --- a/tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs +++ b/tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs @@ -51,13 +51,11 @@ namespace Benchmarks.Tests Parallel.For(0, numStreams, streamId => { - var eventOffset = -1; var streamName = streamId.ToString(); for (var commitId = 0; commitId < numCommits; commitId++) { - eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { Helper.CreateEventData() }).Wait(); - eventOffset++; + eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, new[] { Helper.CreateEventData() }).Wait(); } }); diff --git a/tests/Benchmarks/Utils/Helper.cs b/tests/Benchmarks/Utils/Helper.cs index c73db2af3..c65d12d1b 100644 --- a/tests/Benchmarks/Utils/Helper.cs +++ b/tests/Benchmarks/Utils/Helper.cs @@ -21,7 +21,7 @@ namespace Benchmarks.Utils public static void Warmup(this IEventStore eventStore) { - eventStore.AppendEventsAsync(Guid.NewGuid(), "my-stream", -1, new List { CreateEventData() }).Wait(); + eventStore.AppendEventsAsync(Guid.NewGuid(), "my-stream", new List { CreateEventData() }).Wait(); } } } diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs index 2b4fa2b53..a6f471d7e 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs @@ -88,6 +88,11 @@ namespace Squidex.Infrastructure.CQRS.Events throw new NotSupportedException(); } + public Task AppendEventsAsync(Guid commitId, string streamName, ICollection events) + { + throw new NotSupportedException(); + } + public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events) { throw new NotSupportedException(); From fd3c13d8b9a7ee0563ea515ce831fca0a40c53e2 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Tue, 8 Aug 2017 18:41:32 +0200 Subject: [PATCH 2/3] Temp --- .gitignore | 1 + .../CQRS/Events/MongoEventStore.cs | 4 ++- src/Squidex/Properties/launchSettings.json | 28 ------------------- 3 files changed, 4 insertions(+), 29 deletions(-) delete mode 100644 src/Squidex/Properties/launchSettings.json diff --git a/.gitignore b/.gitignore index dc1259a51..7cce4633a 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ node_modules/ /src/Squidex/appsettings.Development.json /src/Squidex/Assets +/src/Squidex/Properties/launchSettings.json diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs index 3cdbf6dc3..9cb91e7cb 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs @@ -26,7 +26,7 @@ namespace Squidex.Infrastructure.CQRS.Events public class MongoEventStore : MongoRepositoryBase, IEventStore { private const long AnyVersion = long.MinValue; - private const int AppendTimeoutMs = 1000000; + private const int AppendTimeoutMs = 5000; private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); private static readonly FieldDefinition TimestampField = Fields.Build(x => x.Timestamp); private static readonly FieldDefinition EventsCountField = Fields.Build(x => x.EventsCount); @@ -149,6 +149,8 @@ namespace Squidex.Infrastructure.CQRS.Events await Collection.InsertOneAsync(commit, new InsertOneOptions(), cts.Token); notifier.NotifyEventsStored(); + + return; } catch (MongoWriteException ex) { diff --git a/src/Squidex/Properties/launchSettings.json b/src/Squidex/Properties/launchSettings.json deleted file mode 100644 index 32dcc5edc..000000000 --- a/src/Squidex/Properties/launchSettings.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "iisSettings": { - "windowsAuthentication": false, - "anonymousAuthentication": true, - "iisExpress": { - "applicationUrl": "http://localhost:5000/", - "sslPort": 0 - } - }, - "profiles": { - "IIS Express": { - "commandName": "IISExpress", - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - } - }, - "Squidex": { - "commandName": "Project", - "launchUrl": "http://localhost:5000", - "environmentVariables": { - "IDENTITY__ADMINEMAIL": "sebastian@squidex.io", - "IDENTITY__ADMINPASSWORD": "1Xhn2a3b4!?", - "ASPNETCORE_ENVIRONMENT": "Development" - }, - "applicationUrl": "http://localhost:5000" - } - } -} \ No newline at end of file From f18b03058370be3074618fd4f6c2fd47f3ff2052 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Tue, 8 Aug 2017 19:17:30 +0200 Subject: [PATCH 3/3] Mongo Event store improved. --- .../CQRS/Events/MongoEventStore.cs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs index 9cb91e7cb..23710d3c5 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs @@ -26,7 +26,7 @@ namespace Squidex.Infrastructure.CQRS.Events public class MongoEventStore : MongoRepositoryBase, IEventStore { private const long AnyVersion = long.MinValue; - private const int AppendTimeoutMs = 5000; + private const int MaxAttempts = 20; private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); private static readonly FieldDefinition TimestampField = Fields.Build(x => x.Timestamp); private static readonly FieldDefinition EventsCountField = Fields.Build(x => x.EventsCount); @@ -52,9 +52,10 @@ namespace Squidex.Infrastructure.CQRS.Events return new MongoCollectionSettings { ReadPreference = ReadPreference.Primary, WriteConcern = WriteConcern.WMajority }; } - protected override Task SetupCollectionAsync(IMongoCollection collection) + protected override async Task SetupCollectionAsync(IMongoCollection collection) { - return collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }); + await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream)); + await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true }); } public IEventSubscription CreateSubscription(string streamFilter = null, string position = null) @@ -138,15 +139,13 @@ namespace Squidex.Infrastructure.CQRS.Events throw new WrongEventVersionException(currentVersion, expectedVersion); } - var cts = new CancellationTokenSource(AppendTimeoutMs); - var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); - while (!cts.IsCancellationRequested) + for (var attempt = 0; attempt < MaxAttempts; attempt++) { try { - await Collection.InsertOneAsync(commit, new InsertOneOptions(), cts.Token); + await Collection.InsertOneAsync(commit); notifier.NotifyEventsStored(); @@ -162,13 +161,13 @@ namespace Squidex.Infrastructure.CQRS.Events { throw new WrongEventVersionException(currentVersion, expectedVersion); } - else if (!cts.IsCancellationRequested) + else if (attempt < MaxAttempts) { - commit.EventStreamOffset = currentVersion; + expectedVersion = currentVersion; } else { - throw new TaskCanceledException("Could not acquire a free slot for the commit within the provided time."); + throw new TimeoutException("Could not acquire a free slot for the commit within the provided time."); } } else