From a2d04e9acd7f250c03a4c3d08c4412df475a1819 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Wed, 5 Jul 2017 22:24:50 +0200 Subject: [PATCH] Benchmarks and performance improvements. --- Squidex.sln | 17 ++ src/Squidex.Core/Squidex.Core.csproj | 2 +- src/Squidex.Events/Squidex.Events.csproj | 1 + .../Squidex.Infrastructure.GoogleCloud.csproj | 3 +- .../EventStore/MongoEventCommit.cs | 13 +- .../EventStore/MongoEventStore.cs | 164 +++++++----------- .../MongoEventConsumerInfo.cs | 2 +- .../MongoEventConsumerInfoRepository.cs | 20 +-- .../Squidex.Infrastructure.MongoDb.csproj | 1 + .../Squidex.Infrastructure.RabbitMq.csproj | 1 + .../Squidex.Infrastructure.Redis.csproj | 3 +- .../CQRS/Events/EnvelopeExtensions.cs | 6 +- .../CQRS/Events/EventReceiver.cs | 30 ++-- .../CQRS/Events/IEventConsumerInfo.cs | 4 +- .../Events/IEventConsumerInfoRepository.cs | 2 +- .../CQRS/Events/IEventStore.cs | 6 +- .../CQRS/Events/StoredEvent.cs | 21 +-- .../Squidex.Infrastructure.csproj | 3 +- .../Squidex.Read.MongoDb.csproj | 1 + src/Squidex.Read/Squidex.Read.csproj | 1 + src/Squidex.Write/Squidex.Write.csproj | 1 + src/Squidex/Squidex.csproj | 9 +- tests/Benchmarks/Benchmarks.csproj | 1 + tests/Benchmarks/IBenchmark.cs | 1 - tests/Benchmarks/Program.cs | 13 +- .../Benchmarks/Properties/launchSettings.json | 2 +- tests/Benchmarks/Tests/AppendToEventStore.cs | 17 +- .../Tests/AppendToEventStoreParallel.cs | 76 ++++++++ tests/Benchmarks/Utils/Helper.cs | 22 +++ tests/RunCoverage.ps1 | 2 +- .../Squidex.Core.Tests.csproj | 5 +- .../DefaultDomainObjectRepositoryTests.cs | 16 +- .../CQRS/Events/EnvelopeExtensionsTests.cs | 8 +- .../CQRS/Events/EventDataFormatterTests.cs | 2 +- .../CQRS/Events/EventReceiverTests.cs | 26 +-- .../Squidex.Infrastructure.Tests.csproj | 5 +- .../Squidex.Read.Tests.csproj | 5 +- .../Squidex.Write.Tests.csproj | 5 +- tools/Migrate_01/Migrate_01.csproj | 9 + tools/Migrate_01/Program.cs | 94 ++++++++++ 40 files changed, 405 insertions(+), 215 deletions(-) create mode 100644 tests/Benchmarks/Tests/AppendToEventStoreParallel.cs create mode 100644 tests/Benchmarks/Utils/Helper.cs create mode 100644 tools/Migrate_01/Migrate_01.csproj create mode 100644 tools/Migrate_01/Program.cs diff --git a/Squidex.sln b/Squidex.sln index 2165f46ce..385ea40c2 100644 --- a/Squidex.sln +++ b/Squidex.sln @@ -40,6 +40,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{B56EBCEC EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Benchmarks", "tests\Benchmarks\Benchmarks.csproj", "{D48A03DF-BCD3-4667-8747-2F251347E2B6}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "migrations", "migrations", "{94207AA6-4923-4183-A558-E0F8196B8CA3}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Migrate_01", "tools\Migrate_01\Migrate_01.csproj", "{B51126A8-0D75-4A79-867D-10724EC6AC84}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -190,6 +194,18 @@ Global {D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x64.Build.0 = Release|Any CPU {D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x86.ActiveCfg = Release|Any CPU {D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x86.Build.0 = Release|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x64.ActiveCfg = Debug|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x64.Build.0 = Debug|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x86.ActiveCfg = Debug|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x86.Build.0 = Debug|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|Any CPU.Build.0 = Release|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x64.ActiveCfg = Release|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x64.Build.0 = Release|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x86.ActiveCfg = Release|Any CPU + {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -210,5 +226,6 @@ Global {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} {945871B1-77B8-43FB-B53C-27CF385AB756} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} {D48A03DF-BCD3-4667-8747-2F251347E2B6} = {B56EBCEC-9C50-46A7-848C-65502DE69C5C} + {B51126A8-0D75-4A79-867D-10724EC6AC84} = {94207AA6-4923-4183-A558-E0F8196B8CA3} EndGlobalSection EndGlobal diff --git a/src/Squidex.Core/Squidex.Core.csproj b/src/Squidex.Core/Squidex.Core.csproj index bb10ba971..d8a4583d2 100644 --- a/src/Squidex.Core/Squidex.Core.csproj +++ b/src/Squidex.Core/Squidex.Core.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/Squidex.Events/Squidex.Events.csproj b/src/Squidex.Events/Squidex.Events.csproj index 7a80cf949..ab936853b 100644 --- a/src/Squidex.Events/Squidex.Events.csproj +++ b/src/Squidex.Events/Squidex.Events.csproj @@ -13,5 +13,6 @@ + diff --git a/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.csproj b/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.csproj index e3b4db52b..8fd29a45b 100644 --- a/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.csproj +++ b/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.csproj @@ -7,7 +7,8 @@ True - + + diff --git a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs index 7c4cabf31..6b1fefc86 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs @@ -10,7 +10,6 @@ using System; using System.Collections.Generic; using MongoDB.Bson; using MongoDB.Bson.Serialization.Attributes; -using NodaTime; namespace Squidex.Infrastructure.MongoDb.EventStore { @@ -23,15 +22,11 @@ namespace Squidex.Infrastructure.MongoDb.EventStore [BsonRequired] [BsonElement] - public Instant Timestamp { get; set; } + public BsonTimestamp Timestamp { get; set; } [BsonElement] [BsonRequired] - public List Events { get; set; } - - [BsonElement] - [BsonRequired] - public long EventsOffset { get; set; } + public MongoEvent[] Events { get; set; } [BsonElement] [BsonRequired] @@ -39,10 +34,10 @@ namespace Squidex.Infrastructure.MongoDb.EventStore [BsonElement] [BsonRequired] - public string EventStream { get; set; } + public long EventsCount { get; set; } [BsonElement] [BsonRequired] - public long EventsCount { get; set; } + public string EventStream { get; set; } } } diff --git a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs index c23adfade..e10f27d30 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs @@ -8,15 +8,12 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; using MongoDB.Bson; using MongoDB.Driver; -using NodaTime; using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.Reflection; // ReSharper disable ConvertIfStatementToConditionalTernaryExpression // ReSharper disable ClassNeverInstantiated.Local @@ -28,18 +25,14 @@ namespace Squidex.Infrastructure.MongoDb.EventStore public class MongoEventStore : MongoRepositoryBase, IEventStore { private const int Retries = 500; + private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); private readonly IEventNotifier notifier; - private readonly IClock clock; - private string eventsOffsetIndex; - public MongoEventStore(IMongoDatabase database, IEventNotifier notifier, IClock clock) + public MongoEventStore(IMongoDatabase database, IEventNotifier notifier) : base(database) { - Guard.NotNull(clock, nameof(clock)); Guard.NotNull(notifier, nameof(notifier)); - this.clock = clock; - this.notifier = notifier; } @@ -55,17 +48,11 @@ namespace Squidex.Infrastructure.MongoDb.EventStore protected override async Task SetupCollectionAsync(IMongoCollection collection) { - var indexNames = - await Task.WhenAll( - collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventsOffset), new CreateIndexOptions { Unique = true }), - collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }), - collection.Indexes.CreateOneAsync(Index.Descending(x => x.EventsOffset), new CreateIndexOptions { Unique = true }), - collection.Indexes.CreateOneAsync(Index.Descending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true })); - - eventsOffsetIndex = indexNames[0]; + await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Ascending(x => x.Timestamp)); + await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }); } - public IObservable GetEventsAsync(string streamFilter, long lastReceivedEventNumber = -1) + public IObservable GetEventsAsync(string streamFilter = null, string position = null) { return Observable.Create((observer, ct) => { @@ -74,22 +61,29 @@ namespace Squidex.Infrastructure.MongoDb.EventStore observer.OnNext(storedEvent); return Tasks.TaskHelper.Done; - }, ct, streamFilter, lastReceivedEventNumber); + }, ct, streamFilter, position); }); } - public async Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, long lastReceivedEventNumber = -1) + public async Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null) { Guard.NotNull(callback, nameof(callback)); - - var filters = new List>(); - if (lastReceivedEventNumber >= 0) + var tokenTimestamp = EmptyTimestamp; + var tokenEventStreamNumber = -1; + + if (position != null) { - var commitOffset = await GetPreviousOffsetAsync(lastReceivedEventNumber); + var token = ParsePosition(position); - filters.Add(Filter.Gte(x => x.EventsOffset, commitOffset)); + tokenTimestamp = token.Timestamp; + tokenEventStreamNumber = token.EventStreamNumber; } + + var filters = new List> + { + Filter.Gte(x => x.Timestamp, tokenTimestamp) + }; if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, "*", StringComparison.OrdinalIgnoreCase)) { @@ -114,140 +108,106 @@ namespace Squidex.Infrastructure.MongoDb.EventStore filter = filters[0]; } - await Collection.Find(filter).SortBy(x => x.EventsOffset).ForEachAsync(async commit => + await Collection.Find(filter).SortBy(x => x.Timestamp).ForEachAsync(async commit => { - var eventNumber = commit.EventsOffset; - var eventStreamNumber = commit.EventStreamOffset; + var eventStreamNumber = (int)commit.EventStreamOffset; - foreach (var mongoEvent in commit.Events) + foreach (var e in commit.Events) { - eventNumber++; eventStreamNumber++; - if (eventNumber > lastReceivedEventNumber) + if (eventStreamNumber > tokenEventStreamNumber) { - var eventData = SimpleMapper.Map(mongoEvent, new EventData()); + var eventData = new EventData { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type }; + var eventToken = CreateToken(commit.Timestamp, eventStreamNumber); - await callback(new StoredEvent(eventNumber, eventStreamNumber, eventData)); + await callback(new StoredEvent(eventToken, eventStreamNumber, eventData)); } } }, cancellationToken); } - public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable events) + public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events) { Guard.NotNullOrEmpty(streamName, nameof(streamName)); Guard.NotNull(events, nameof(events)); - var currentVersion = await GetEventStreamOffset(streamName); + var eventsCount = events.Count; - if (currentVersion != expectedVersion) + if (eventsCount > 0) { - throw new WrongEventVersionException(currentVersion, expectedVersion); - } + var commitEvents = new MongoEvent[events.Count]; - var now = clock.GetCurrentInstant(); + var i = 0; - var commitEvents = events.Select(x => SimpleMapper.Map(x, new MongoEvent())).ToList(); + foreach (var e in events) + { + var mongoEvent = new MongoEvent { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type }; - if (commitEvents.Any()) - { - var offset = await GetEventOffsetAsync(); + commitEvents[i++] = mongoEvent; + } var commit = new MongoEventCommit { Id = commitId, Events = commitEvents, - EventsOffset = offset, - EventsCount = commitEvents.Count, + EventsCount = eventsCount, EventStream = streamName, EventStreamOffset = expectedVersion, - Timestamp = now + Timestamp = EmptyTimestamp }; + + try + { + await Collection.InsertOneAsync(commit); - for (var retry = 0; retry < Retries; retry++) + notifier.NotifyEventsStored(); + } + catch (MongoWriteException ex) { - try + if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) { - await Collection.InsertOneAsync(commit); + var currentVersion = await GetEventStreamOffset(streamName); - notifier.NotifyEventsStored(); - - return; - } - catch (MongoWriteException ex) - { - if (ex.Message.IndexOf(eventsOffsetIndex, StringComparison.OrdinalIgnoreCase) >= 0) - { - commit.EventsOffset = await GetEventOffsetAsync(); - } - else if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) - { - currentVersion = await GetEventStreamOffset(streamName); - - throw new WrongEventVersionException(currentVersion, expectedVersion); - } - else - { - throw; - } + throw new WrongEventVersionException(currentVersion, expectedVersion); } + + throw; } } } - private async Task GetPreviousOffsetAsync(long startEventNumber) + private async Task GetEventStreamOffset(string streamName) { var document = - await Collection.Find(x => x.EventsOffset <= startEventNumber) + await Collection.Find(x => x.EventStream == streamName) .Project(Project - .Include(x => x.EventsOffset)) - .SortByDescending(x => x.EventsOffset).Limit(1) + .Include(x => x.EventStreamOffset) + .Include(x => x.EventsCount)) + .SortByDescending(x => x.EventStreamOffset).Limit(1) .FirstOrDefaultAsync(); if (document != null) { - return document["EventsOffset"].ToInt64(); + return document["EventStreamOffset"].ToInt64() + document["EventsCount"].ToInt64(); } return -1; } - private async Task GetEventOffsetAsync() + private static string CreateToken(BsonTimestamp timestamp, int eventStreamNumber) { - var document = - await Collection.Find(new BsonDocument()) - .Project(Project - .Include(x => x.EventsOffset) - .Include(x => x.EventsCount)) - .SortByDescending(x => x.EventsOffset).Limit(1) - .FirstOrDefaultAsync(); + var parts = new object[] { timestamp.Timestamp, timestamp.Increment, eventStreamNumber }; - if (document != null) - { - return document["EventsOffset"].ToInt64() + document["EventsCount"].ToInt64(); - } - - return -1; + return string.Join("$", parts); } - private async Task GetEventStreamOffset(string streamName) + private static (BsonTimestamp Timestamp, int EventStreamNumber) ParsePosition(string position) { - var document = - await Collection.Find(x => x.EventStream == streamName) - .Project(Project - .Include(x => x.EventStreamOffset) - .Include(x => x.EventsCount)) - .SortByDescending(x => x.EventsOffset).Limit(1) - .FirstOrDefaultAsync(); - - if (document != null) - { - return document["EventStreamOffset"].ToInt64() + document["EventsCount"].ToInt64(); - } + var parts = position.Split('$'); - return -1; + return (new BsonTimestamp(int.Parse(parts[0]), int.Parse(parts[1])), int.Parse(parts[2])); } } } diff --git a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs index b1b179913..2c7fec6d5 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs @@ -32,6 +32,6 @@ namespace Squidex.Infrastructure.MongoDb [BsonElement] [BsonRequired] - public long LastHandledEventNumber { get; set; } + public string Position { get; set; } } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs index dabe44358..11dfdbad3 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs @@ -47,7 +47,7 @@ namespace Squidex.Infrastructure.MongoDb { try { - await Collection.InsertOneAsync(new MongoEventConsumerInfo { Name = consumerName, LastHandledEventNumber = -1 }); + await Collection.InsertOneAsync(new MongoEventConsumerInfo { Name = consumerName, Position = null }); } catch (MongoWriteException ex) { @@ -61,31 +61,27 @@ namespace Squidex.Infrastructure.MongoDb public Task StartAsync(string consumerName) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, - Update.Unset(x => x.IsStopped)); + return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Unset(x => x.IsStopped)); } public Task StopAsync(string consumerName, string error = null) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, - Update.Set(x => x.IsStopped, true).Set(x => x.Error, error)); + return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsStopped, true).Set(x => x.Error, error)); } public Task ResetAsync(string consumerName) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, - Update.Set(x => x.IsResetting, true)); + return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsResetting, true)); } - public Task SetLastHandledEventNumberAsync(string consumerName, long eventNumber) + public Task SetLastHandledEventNumberAsync(string consumerName, string position) { - return Collection.ReplaceOneAsync(x => x.Name == consumerName, - CreateEntity(consumerName, eventNumber)); + return Collection.ReplaceOneAsync(x => x.Name == consumerName, CreateEntity(consumerName, position)); } - private static MongoEventConsumerInfo CreateEntity(string consumerName, long eventNumber) + private static MongoEventConsumerInfo CreateEntity(string consumerName, string position) { - return new MongoEventConsumerInfo { Name = consumerName, LastHandledEventNumber = eventNumber }; + return new MongoEventConsumerInfo { Name = consumerName, Position = position }; } } } diff --git a/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj b/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj index e2d8bc9af..df9225f17 100644 --- a/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj +++ b/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj @@ -11,5 +11,6 @@ + diff --git a/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj b/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj index ab2f8af4c..4d863d14f 100644 --- a/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj +++ b/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj @@ -8,6 +8,7 @@ + diff --git a/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj b/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj index 6cb63c49e..dc7458aa0 100644 --- a/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj +++ b/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj @@ -10,6 +10,7 @@ - + + diff --git a/src/Squidex.Infrastructure/CQRS/Events/EnvelopeExtensions.cs b/src/Squidex.Infrastructure/CQRS/Events/EnvelopeExtensions.cs index 80886571a..485ae0668 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/EnvelopeExtensions.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/EnvelopeExtensions.cs @@ -14,12 +14,12 @@ namespace Squidex.Infrastructure.CQRS.Events { public static class EnvelopeExtensions { - public static long EventNumber(this EnvelopeHeaders headers) + public static string EventPosition(this EnvelopeHeaders headers) { - return headers[CommonHeaders.EventNumber].ToInt32(CultureInfo.InvariantCulture); + return headers[CommonHeaders.EventNumber].ToString(); } - public static Envelope SetEventNumber(this Envelope envelope, long value) where T : class + public static Envelope SetEventPosition(this Envelope envelope, string value) where T : class { envelope.Headers.Set(CommonHeaders.EventNumber, value); diff --git a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs index 08b169e32..44ccbef61 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs @@ -11,6 +11,7 @@ using System.Threading.Tasks; using Squidex.Infrastructure.Log; using Squidex.Infrastructure.Timers; +// ReSharper disable ConvertToLambdaExpression // ReSharper disable MethodSupportsCancellation // ReSharper disable ConvertIfStatementToConditionalTernaryExpression // ReSharper disable InvertIf @@ -97,21 +98,23 @@ namespace Squidex.Infrastructure.CQRS.Events { var status = await eventConsumerInfoRepository.FindAsync(consumerName); - var lastHandledEventNumber = status.LastHandledEventNumber; + var position = status.Position; if (status.IsResetting) { await ResetAsync(eventConsumer, consumerName); - lastHandledEventNumber = -1; + position = null; } else if (status.IsStopped) { return; } - await eventStore.GetEventsAsync(se => HandleEventAsync(eventConsumer, se, consumerName), ct, - eventConsumer.EventsFilter, lastHandledEventNumber); + await eventStore.GetEventsAsync(se => + { + return HandleEventAsync(eventConsumer, se, consumerName); + }, ct, eventConsumer.EventsFilter, position); } catch (Exception ex) { @@ -133,8 +136,9 @@ namespace Squidex.Infrastructure.CQRS.Events return; } - await DispatchConsumer(@event, eventConsumer); - await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, storedEvent.EventNumber); + await DispatchConsumer(@event, eventConsumer, consumerName); + + await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, storedEvent.EventPosition); } private async Task ResetAsync(IEventConsumer eventConsumer, string consumerName) @@ -149,7 +153,7 @@ namespace Squidex.Infrastructure.CQRS.Events .WriteProperty("eventConsumer", eventConsumer.GetType().Name)); await eventConsumer.ClearAsync(); - await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, -1); + await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, null); log.LogInformation(w => w .WriteProperty("action", "EventConsumerReset") @@ -169,7 +173,7 @@ namespace Squidex.Infrastructure.CQRS.Events } } - private async Task DispatchConsumer(Envelope @event, IEventConsumer eventConsumer) + private async Task DispatchConsumer(Envelope @event, IEventConsumer eventConsumer, string consumerName) { var eventId = @event.Headers.EventId().ToString(); var eventType = @event.Payload.GetType().Name; @@ -181,7 +185,7 @@ namespace Squidex.Infrastructure.CQRS.Events .WriteProperty("state", "Started") .WriteProperty("eventId", eventId) .WriteProperty("eventType", eventType) - .WriteProperty("eventConsumer", eventConsumer.GetType().Name)); + .WriteProperty("eventConsumer", consumerName)); await eventConsumer.On(@event); @@ -191,7 +195,7 @@ namespace Squidex.Infrastructure.CQRS.Events .WriteProperty("state", "Completed") .WriteProperty("eventId", eventId) .WriteProperty("eventType", eventType) - .WriteProperty("eventConsumer", eventConsumer.GetType().Name)); + .WriteProperty("eventConsumer", consumerName)); } catch (Exception ex) { @@ -201,7 +205,7 @@ namespace Squidex.Infrastructure.CQRS.Events .WriteProperty("state", "Started") .WriteProperty("eventId", eventId) .WriteProperty("eventType", eventType) - .WriteProperty("eventConsumer", eventConsumer.GetType().Name)); + .WriteProperty("eventConsumer", consumerName)); throw; } @@ -213,7 +217,7 @@ namespace Squidex.Infrastructure.CQRS.Events { var @event = formatter.Parse(storedEvent.Data); - @event.SetEventNumber(storedEvent.EventNumber); + @event.SetEventPosition(storedEvent.EventPosition); @event.SetEventStreamNumber(storedEvent.EventStreamNumber); return @event; @@ -228,7 +232,7 @@ namespace Squidex.Infrastructure.CQRS.Events .WriteProperty("action", "ParseEvent") .WriteProperty("state", "Failed") .WriteProperty("eventId", storedEvent.Data.EventId.ToString()) - .WriteProperty("eventNumber", storedEvent.EventNumber)); + .WriteProperty("eventPosition", storedEvent.EventPosition)); throw; } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs index 492c73a8b..3e3551f8c 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs @@ -10,8 +10,6 @@ namespace Squidex.Infrastructure.CQRS.Events { public interface IEventConsumerInfo { - long LastHandledEventNumber { get; } - bool IsStopped { get; } bool IsResetting { get; } @@ -19,5 +17,7 @@ namespace Squidex.Infrastructure.CQRS.Events string Name { get; } string Error { get; } + + string Position { get; } } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs index 5e77c01ae..6df6475e2 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs @@ -25,6 +25,6 @@ namespace Squidex.Infrastructure.CQRS.Events Task ResetAsync(string consumerName); - Task SetLastHandledEventNumberAsync(string consumerName, long eventNumber); + Task SetLastHandledEventNumberAsync(string consumerName, string position); } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs index ef69f6b03..f3f6491eb 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs @@ -15,10 +15,10 @@ namespace Squidex.Infrastructure.CQRS.Events { public interface IEventStore { - IObservable GetEventsAsync(string streamFilter = null, long lastReceivedEventNumber = -1); + IObservable GetEventsAsync(string streamFilter = null, string position = null); - Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, long lastReceivedEventNumber = -1); + Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null); - Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable events); + Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events); } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs b/src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs index 547956c13..2029df12e 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs @@ -10,31 +10,32 @@ namespace Squidex.Infrastructure.CQRS.Events { public sealed class StoredEvent { - private readonly long eventNumber; - private readonly long eventStreamNumber; + private readonly string eventPosition; + private readonly int eventStreamNumber; private readonly EventData data; - public long EventNumber + public string EventPosition { - get { return eventNumber; } + get { return eventPosition; } } - public long EventStreamNumber + public EventData Data { - get { return eventStreamNumber; } + get { return data; } } - public EventData Data + public int EventStreamNumber { - get { return data; } + get { return eventStreamNumber; } } - public StoredEvent(long eventNumber, long eventStreamNumber, EventData data) + public StoredEvent(string eventPosition, int eventStreamNumber, EventData data) { + Guard.NotNullOrEmpty(eventPosition, nameof(eventPosition)); Guard.NotNull(data, nameof(data)); this.data = data; - this.eventNumber = eventNumber; + this.eventPosition = eventPosition; this.eventStreamNumber = eventStreamNumber; } } diff --git a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj index a52fa4d98..e38f9d1ca 100644 --- a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj +++ b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj @@ -11,11 +11,12 @@ - + + diff --git a/src/Squidex.Read.MongoDb/Squidex.Read.MongoDb.csproj b/src/Squidex.Read.MongoDb/Squidex.Read.MongoDb.csproj index ab6a992d6..3d222610b 100644 --- a/src/Squidex.Read.MongoDb/Squidex.Read.MongoDb.csproj +++ b/src/Squidex.Read.MongoDb/Squidex.Read.MongoDb.csproj @@ -19,6 +19,7 @@ + diff --git a/src/Squidex.Read/Squidex.Read.csproj b/src/Squidex.Read/Squidex.Read.csproj index 4bc4ca905..e949d599b 100644 --- a/src/Squidex.Read/Squidex.Read.csproj +++ b/src/Squidex.Read/Squidex.Read.csproj @@ -18,5 +18,6 @@ + diff --git a/src/Squidex.Write/Squidex.Write.csproj b/src/Squidex.Write/Squidex.Write.csproj index 55bc89d81..89971bac2 100644 --- a/src/Squidex.Write/Squidex.Write.csproj +++ b/src/Squidex.Write/Squidex.Write.csproj @@ -15,5 +15,6 @@ + diff --git a/src/Squidex/Squidex.csproj b/src/Squidex/Squidex.csproj index 49d21c99f..b4c6a37e9 100644 --- a/src/Squidex/Squidex.csproj +++ b/src/Squidex/Squidex.csproj @@ -65,13 +65,14 @@ - + - + - - + + + diff --git a/tests/Benchmarks/Benchmarks.csproj b/tests/Benchmarks/Benchmarks.csproj index 664de6b95..e6995c8d6 100644 --- a/tests/Benchmarks/Benchmarks.csproj +++ b/tests/Benchmarks/Benchmarks.csproj @@ -9,5 +9,6 @@ + diff --git a/tests/Benchmarks/IBenchmark.cs b/tests/Benchmarks/IBenchmark.cs index 19eff0e07..a67b4171f 100644 --- a/tests/Benchmarks/IBenchmark.cs +++ b/tests/Benchmarks/IBenchmark.cs @@ -5,7 +5,6 @@ // Copyright (c) Squidex Group // All rights reserved. // ========================================================================== - namespace Benchmarks { public interface IBenchmark diff --git a/tests/Benchmarks/Program.cs b/tests/Benchmarks/Program.cs index 595223d34..5859fe367 100644 --- a/tests/Benchmarks/Program.cs +++ b/tests/Benchmarks/Program.cs @@ -1,4 +1,12 @@ -using System; +// ========================================================================== +// Program.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -10,7 +18,8 @@ namespace Benchmarks { private static readonly List Benchmarks = new List { - new AppendToEventStore() + new AppendToEventStore(), + new AppendToEventStoreParallel() }; public static void Main(string[] args) diff --git a/tests/Benchmarks/Properties/launchSettings.json b/tests/Benchmarks/Properties/launchSettings.json index 2e81874bd..5d43f5fe1 100644 --- a/tests/Benchmarks/Properties/launchSettings.json +++ b/tests/Benchmarks/Properties/launchSettings.json @@ -2,7 +2,7 @@ "profiles": { "Benchmarks": { "commandName": "Project", - "commandLineArgs": "appendToEventStore" + "commandLineArgs": "appendToEventStoreParallel" } } } \ No newline at end of file diff --git a/tests/Benchmarks/Tests/AppendToEventStore.cs b/tests/Benchmarks/Tests/AppendToEventStore.cs index 5eab44b5c..6387bce5e 100644 --- a/tests/Benchmarks/Tests/AppendToEventStore.cs +++ b/tests/Benchmarks/Tests/AppendToEventStore.cs @@ -7,8 +7,8 @@ // ========================================================================== using System; +using Benchmarks.Utils; using MongoDB.Driver; -using NodaTime; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.MongoDb.EventStore; @@ -20,14 +20,6 @@ namespace Benchmarks.Tests private IMongoClient mongoClient; private IMongoDatabase mongoDatabase; - private static readonly EventData EventData = new EventData - { - EventId = Guid.NewGuid(), - Metadata = "EventMetdata", - Payload = "EventPayload", - Type = "MyEvent" - }; - public string Id { get { return "appendToEventStore"; } @@ -46,15 +38,14 @@ namespace Benchmarks.Tests public void RunInitialize() { mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString()); - mongoDatabase.CreateCollection("Test"); } public long Run() { - const long numCommits = 10; + const long numCommits = 200; const long eventStreams = 10; - var eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()), SystemClock.Instance); + var eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub())); for (var streamId = 0; streamId < eventStreams; streamId++) { @@ -63,7 +54,7 @@ namespace Benchmarks.Tests for (var commitId = 0; commitId < numCommits; commitId++) { - eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { EventData }).Wait(); + eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { Helper.CreateEventData() }).Wait(); eventOffset++; } diff --git a/tests/Benchmarks/Tests/AppendToEventStoreParallel.cs b/tests/Benchmarks/Tests/AppendToEventStoreParallel.cs new file mode 100644 index 000000000..ababb07f5 --- /dev/null +++ b/tests/Benchmarks/Tests/AppendToEventStoreParallel.cs @@ -0,0 +1,76 @@ +// ========================================================================== +// AppendToEventStoreParallel.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using Benchmarks.Utils; +using MongoDB.Driver; +using Squidex.Infrastructure; +using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.MongoDb.EventStore; + +namespace Benchmarks.Tests +{ + public sealed class AppendToEventStoreParallel : IBenchmark + { + private IMongoClient mongoClient; + private IMongoDatabase mongoDatabase; + + public string Id + { + get { return "appendToEventStoreParallel"; } + } + + public string Name + { + get { return "Append Events to EventStore Parallel"; } + } + + public void Initialize() + { + mongoClient = new MongoClient("mongodb://localhost"); + } + + public void RunInitialize() + { + mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString()); + } + + public long Run() + { + const long numCommits = 200; + const long eventStreams = 10; + + var eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub())); + + Parallel.For(0, eventStreams, streamId => + { + var eventOffset = -1; + var streamName = streamId.ToString(); + + for (var commitId = 0; commitId < numCommits; commitId++) + { + eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { Helper.CreateEventData() }).Wait(); + + eventOffset++; + } + }); + + return numCommits * eventStreams; + } + + public void RunCleanup() + { + mongoClient.DropDatabase(mongoDatabase.DatabaseNamespace.DatabaseName); + } + + public void Cleanup() + { + } + } +} diff --git a/tests/Benchmarks/Utils/Helper.cs b/tests/Benchmarks/Utils/Helper.cs new file mode 100644 index 000000000..576740b70 --- /dev/null +++ b/tests/Benchmarks/Utils/Helper.cs @@ -0,0 +1,22 @@ +// ========================================================================== +// Helper.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using Squidex.Infrastructure.CQRS.Events; + +namespace Benchmarks.Utils +{ + public static class Helper + { + public static EventData CreateEventData() + { + return new EventData { EventId = Guid.NewGuid(), Metadata = "EventMetdata", Payload = "EventPayload", Type = "MyEvent" }; + + } + } +} diff --git a/tests/RunCoverage.ps1 b/tests/RunCoverage.ps1 index aa0978b6f..5e9194cca 100644 --- a/tests/RunCoverage.ps1 +++ b/tests/RunCoverage.ps1 @@ -50,6 +50,6 @@ New-Item -ItemType directory -Path $reportsFolder -output:"$workingFolder\$reportsFolder\Read.xml" ` -oldStyle -&"$userProfile\.nuget\packages\ReportGenerator\2.5.8\tools\ReportGenerator.exe" ` +&"$userProfile\.nuget\packages\ReportGenerator\2.5.9\tools\ReportGenerator.exe" ` -reports:"$workingFolder\$reportsFolder\*.xml" ` -targetdir:"$workingFolder\$reportsFolder\Output" \ No newline at end of file diff --git a/tests/Squidex.Core.Tests/Squidex.Core.Tests.csproj b/tests/Squidex.Core.Tests/Squidex.Core.Tests.csproj index 27b4d85aa..1ca55e6c7 100644 --- a/tests/Squidex.Core.Tests/Squidex.Core.Tests.csproj +++ b/tests/Squidex.Core.Tests/Squidex.Core.Tests.csproj @@ -10,9 +10,10 @@ - + - + + diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs index 167f44a9a..00c598911 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs @@ -74,7 +74,7 @@ namespace Squidex.Infrastructure.CQRS.Commands [Fact] public async Task Should_throw_exception_when_event_store_returns_no_events() { - eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(Observable.Empty()); + eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(Observable.Empty()); await Assert.ThrowsAsync(() => sut.GetByIdAsync(aggregateId)); } @@ -90,11 +90,11 @@ namespace Squidex.Infrastructure.CQRS.Commands var events = new[] { - new StoredEvent(0, 0, eventData1), - new StoredEvent(1, 1, eventData2) + new StoredEvent("0", 0, eventData1), + new StoredEvent("1", 1, eventData2) }; - eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(events.ToObservable()); + eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(events.ToObservable()); eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope(event1)); eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope(event2)); @@ -115,11 +115,11 @@ namespace Squidex.Infrastructure.CQRS.Commands var events = new[] { - new StoredEvent(0, 0, eventData1), - new StoredEvent(1, 1, eventData2) + new StoredEvent("0", 0, eventData1), + new StoredEvent("1", 1, eventData2) }; - eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(events.ToObservable()); + eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(events.ToObservable()); eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope(event1)); eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope(event2)); @@ -141,7 +141,7 @@ namespace Squidex.Infrastructure.CQRS.Commands eventDataFormatter.Setup(x => x.ToEventData(It.Is>(e => e.Payload == event1), commitId)).Returns(eventData1); eventDataFormatter.Setup(x => x.ToEventData(It.Is>(e => e.Payload == event2), commitId)).Returns(eventData2); - eventStore.Setup(x => x.AppendEventsAsync(commitId, streamName, 123, It.Is>(e => e.Count() == 2))) + eventStore.Setup(x => x.AppendEventsAsync(commitId, streamName, 123, It.Is>(e => e.Count() == 2))) .Returns(TaskHelper.Done) .Verifiable(); diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EnvelopeExtensionsTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EnvelopeExtensionsTests.cs index c66c25fe3..f626afb6e 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EnvelopeExtensionsTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EnvelopeExtensionsTests.cs @@ -65,12 +65,12 @@ namespace Squidex.Infrastructure.CQRS.Events [Fact] public void Should_set_and_get_event_number() { - const int eventNumber = 123; + const string eventNumber = "123"; - sut.SetEventNumber(eventNumber); + sut.SetEventPosition(eventNumber); - Assert.Equal(eventNumber, sut.Headers.EventNumber()); - Assert.Equal(eventNumber, sut.Headers["EventNumber"].ToInt32(culture)); + Assert.Equal(eventNumber, sut.Headers.EventPosition()); + Assert.Equal(eventNumber, sut.Headers["EventNumber"].ToString()); } [Fact] diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventDataFormatterTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventDataFormatterTests.cs index e76e34877..d0d55be94 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventDataFormatterTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventDataFormatterTests.cs @@ -41,7 +41,7 @@ namespace Squidex.Infrastructure.CQRS.Events inputEvent.SetAggregateId(Guid.NewGuid()); inputEvent.SetCommitId(commitId); inputEvent.SetEventId(Guid.NewGuid()); - inputEvent.SetEventNumber(1); + inputEvent.SetEventPosition("1"); inputEvent.SetEventStreamNumber(1); inputEvent.SetTimestamp(SystemClock.Instance.GetCurrentInstant()); diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs index c8c6844a2..15a99eba6 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs @@ -27,13 +27,15 @@ namespace Squidex.Infrastructure.CQRS.Events private sealed class MyEventConsumerInfo : IEventConsumerInfo { - public long LastHandledEventNumber { get; set; } - public bool IsStopped { get; set; } + public bool IsResetting { get; set; } public string Name { get; set; } + public string Error { get; set; } + + public string Position { get; set; } } private sealed class MyEventStore : IEventStore @@ -45,7 +47,7 @@ namespace Squidex.Infrastructure.CQRS.Events this.storedEvents = storedEvents; } - public async Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, long lastReceivedEventNumber = -1) + public async Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null) { foreach (var @event in storedEvents) { @@ -53,12 +55,12 @@ namespace Squidex.Infrastructure.CQRS.Events } } - public IObservable GetEventsAsync(string streamFilter = null, long lastReceivedEventNumber = -1) + public IObservable GetEventsAsync(string streamFilter = null, string position = null) { throw new NotSupportedException(); } - public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable events) + public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events) { throw new NotSupportedException(); } @@ -83,9 +85,9 @@ namespace Squidex.Infrastructure.CQRS.Events { var events = new[] { - new StoredEvent(3, 3, eventData1), - new StoredEvent(4, 4, eventData2), - new StoredEvent(5, 5, eventData3) + new StoredEvent("3", 3, eventData1), + new StoredEvent("4", 4, eventData2), + new StoredEvent("5", 5, eventData3) }; consumerName = eventConsumer.Object.GetType().Name; @@ -116,7 +118,7 @@ namespace Squidex.Infrastructure.CQRS.Events [Fact] public void Should_subscribe_to_consumer_and_handle_events() { - consumerInfo.LastHandledEventNumber = 2L; + consumerInfo.Position = "2"; sut.Subscribe(eventConsumer.Object); sut.Next(); @@ -130,7 +132,7 @@ namespace Squidex.Infrastructure.CQRS.Events [Fact] public void Should_abort_if_handling_failed() { - consumerInfo.LastHandledEventNumber = 2L; + consumerInfo.Position = "2"; eventConsumer.Setup(x => x.On(envelope1)).Returns(TaskHelper.True); eventConsumer.Setup(x => x.On(envelope2)).Throws(new InvalidOperationException()); @@ -149,7 +151,7 @@ namespace Squidex.Infrastructure.CQRS.Events [Fact] public void Should_abort_if_serialization_failed() { - consumerInfo.LastHandledEventNumber = 2L; + consumerInfo.Position = "2"; formatter.Setup(x => x.Parse(eventData2)).Throws(new InvalidOperationException()); @@ -168,7 +170,7 @@ namespace Squidex.Infrastructure.CQRS.Events public void Should_reset_if_requested() { consumerInfo.IsResetting = true; - consumerInfo.LastHandledEventNumber = 2L; + consumerInfo.Position = "2"; sut.Subscribe(eventConsumer.Object); sut.Next(); diff --git a/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj b/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj index f03670b95..d552c8f72 100644 --- a/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj +++ b/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj @@ -8,11 +8,12 @@ - + - + + diff --git a/tests/Squidex.Read.Tests/Squidex.Read.Tests.csproj b/tests/Squidex.Read.Tests/Squidex.Read.Tests.csproj index 4c7a3cb0a..9cda57a9f 100644 --- a/tests/Squidex.Read.Tests/Squidex.Read.Tests.csproj +++ b/tests/Squidex.Read.Tests/Squidex.Read.Tests.csproj @@ -12,10 +12,11 @@ - + - + + diff --git a/tests/Squidex.Write.Tests/Squidex.Write.Tests.csproj b/tests/Squidex.Write.Tests/Squidex.Write.Tests.csproj index 4e86bea48..c4e6ef2e5 100644 --- a/tests/Squidex.Write.Tests/Squidex.Write.Tests.csproj +++ b/tests/Squidex.Write.Tests/Squidex.Write.Tests.csproj @@ -11,10 +11,11 @@ - + - + + diff --git a/tools/Migrate_01/Migrate_01.csproj b/tools/Migrate_01/Migrate_01.csproj new file mode 100644 index 000000000..8403c40b3 --- /dev/null +++ b/tools/Migrate_01/Migrate_01.csproj @@ -0,0 +1,9 @@ + + + Exe + netcoreapp1.1 + + + + + diff --git a/tools/Migrate_01/Program.cs b/tools/Migrate_01/Program.cs new file mode 100644 index 000000000..e0aed2cf0 --- /dev/null +++ b/tools/Migrate_01/Program.cs @@ -0,0 +1,94 @@ +// ========================================================================== +// Program.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using MongoDB.Bson; +using MongoDB.Driver; + +namespace Migrate_01 +{ + public class Program + { + public static void Main(string[] args) + { + Console.WriteLine("Migrate EventStore"); + + var mongoClient = new MongoClient(GetMongoConnectionValue()); + var mongoDatabase = mongoClient.GetDatabase(GetMongoDatabaseValue()); + + var collection = mongoDatabase.GetCollection("Events"); + + Console.Write("Migrate Indices....."); + + collection.Indexes.DropAll(); + + Console.WriteLine("DONE"); + + var query = + collection.Find(new BsonDocument()) + .Project( + Builders.Projection.Include(Field("EventsOffset"))) + .ToList(); + + Console.Write("Migrate Documents..."); + + foreach (var eventCommit in query) + { + var eventsOffset = (int)eventCommit["EventsOffset"].AsInt64; + + var ts = new BsonTimestamp(eventsOffset + 10, 1); + + collection.UpdateOne( + Builders.Filter + .Eq(Field("_id"), eventCommit["_id"].AsString), + Builders.Update + .Set(Field("Timestamp"), ts).Unset(Field("EventsOffset"))); + } + + Console.WriteLine("DONE"); + } + + private static StringFieldDefinition Field(string fieldName) + { + return new StringFieldDefinition(fieldName); + } + + private static StringFieldDefinition Field(string fieldName) + { + return new StringFieldDefinition(fieldName); + } + + private static string GetMongoConnectionValue() + { + Console.Write("Mongo Connection (ENTER for 'mongodb://localhost'): "); + + var mongoConnection = Console.ReadLine(); + + if (string.IsNullOrWhiteSpace(mongoConnection)) + { + mongoConnection = "mongodb://localhost"; + } + + return mongoConnection; + } + + private static string GetMongoDatabaseValue() + { + Console.Write("Mongo Database (ENTER for 'Squidex'): "); + + var mongoDatabase = Console.ReadLine(); + + if (string.IsNullOrWhiteSpace(mongoDatabase)) + { + mongoDatabase = "Squidex"; + } + + return mongoDatabase; + } + } +}