Browse Source

Benchmarks and performance improvements.

pull/65/head
Sebastian Stehle 9 years ago
parent
commit
a2d04e9acd
  1. 17
      Squidex.sln
  2. 2
      src/Squidex.Core/Squidex.Core.csproj
  3. 1
      src/Squidex.Events/Squidex.Events.csproj
  4. 3
      src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.csproj
  5. 13
      src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs
  6. 164
      src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs
  7. 2
      src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs
  8. 20
      src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs
  9. 1
      src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj
  10. 1
      src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj
  11. 3
      src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj
  12. 6
      src/Squidex.Infrastructure/CQRS/Events/EnvelopeExtensions.cs
  13. 30
      src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs
  14. 4
      src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs
  15. 2
      src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs
  16. 6
      src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs
  17. 21
      src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs
  18. 3
      src/Squidex.Infrastructure/Squidex.Infrastructure.csproj
  19. 1
      src/Squidex.Read.MongoDb/Squidex.Read.MongoDb.csproj
  20. 1
      src/Squidex.Read/Squidex.Read.csproj
  21. 1
      src/Squidex.Write/Squidex.Write.csproj
  22. 9
      src/Squidex/Squidex.csproj
  23. 1
      tests/Benchmarks/Benchmarks.csproj
  24. 1
      tests/Benchmarks/IBenchmark.cs
  25. 13
      tests/Benchmarks/Program.cs
  26. 2
      tests/Benchmarks/Properties/launchSettings.json
  27. 17
      tests/Benchmarks/Tests/AppendToEventStore.cs
  28. 76
      tests/Benchmarks/Tests/AppendToEventStoreParallel.cs
  29. 22
      tests/Benchmarks/Utils/Helper.cs
  30. 2
      tests/RunCoverage.ps1
  31. 5
      tests/Squidex.Core.Tests/Squidex.Core.Tests.csproj
  32. 16
      tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs
  33. 8
      tests/Squidex.Infrastructure.Tests/CQRS/Events/EnvelopeExtensionsTests.cs
  34. 2
      tests/Squidex.Infrastructure.Tests/CQRS/Events/EventDataFormatterTests.cs
  35. 26
      tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs
  36. 5
      tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj
  37. 5
      tests/Squidex.Read.Tests/Squidex.Read.Tests.csproj
  38. 5
      tests/Squidex.Write.Tests/Squidex.Write.Tests.csproj
  39. 9
      tools/Migrate_01/Migrate_01.csproj
  40. 94
      tools/Migrate_01/Program.cs

17
Squidex.sln

@ -40,6 +40,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{B56EBCEC
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Benchmarks", "tests\Benchmarks\Benchmarks.csproj", "{D48A03DF-BCD3-4667-8747-2F251347E2B6}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Benchmarks", "tests\Benchmarks\Benchmarks.csproj", "{D48A03DF-BCD3-4667-8747-2F251347E2B6}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "migrations", "migrations", "{94207AA6-4923-4183-A558-E0F8196B8CA3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Migrate_01", "tools\Migrate_01\Migrate_01.csproj", "{B51126A8-0D75-4A79-867D-10724EC6AC84}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -190,6 +194,18 @@ Global
{D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x64.Build.0 = Release|Any CPU {D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x64.Build.0 = Release|Any CPU
{D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x86.ActiveCfg = Release|Any CPU {D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x86.ActiveCfg = Release|Any CPU
{D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x86.Build.0 = Release|Any CPU {D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x86.Build.0 = Release|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x64.ActiveCfg = Debug|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x64.Build.0 = Debug|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x86.ActiveCfg = Debug|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x86.Build.0 = Debug|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|Any CPU.Build.0 = Release|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x64.ActiveCfg = Release|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x64.Build.0 = Release|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x86.ActiveCfg = Release|Any CPU
{B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -210,5 +226,6 @@ Global
{C1E5BBB6-6B6A-4DE5-B19D-0538304DE343} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF}
{945871B1-77B8-43FB-B53C-27CF385AB756} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} {945871B1-77B8-43FB-B53C-27CF385AB756} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF}
{D48A03DF-BCD3-4667-8747-2F251347E2B6} = {B56EBCEC-9C50-46A7-848C-65502DE69C5C} {D48A03DF-BCD3-4667-8747-2F251347E2B6} = {B56EBCEC-9C50-46A7-848C-65502DE69C5C}
{B51126A8-0D75-4A79-867D-10724EC6AC84} = {94207AA6-4923-4183-A558-E0F8196B8CA3}
EndGlobalSection EndGlobalSection
EndGlobal EndGlobal

2
src/Squidex.Core/Squidex.Core.csproj

@ -14,7 +14,7 @@
<PackageReference Include="protobuf-net" Version="2.2.1" /> <PackageReference Include="protobuf-net" Version="2.2.1" />
<PackageReference Include="System.Collections.Immutable" Version="1.3.1" /> <PackageReference Include="System.Collections.Immutable" Version="1.3.1" />
<PackageReference Include="NodaTime" Version="2.0.3" /> <PackageReference Include="NodaTime" Version="2.0.3" />
<PackageReference Include="NJsonSchema" Version="9.1.11" /> <PackageReference Include="NJsonSchema" Version="9.2.5" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" /> <PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.6' "> <ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.6' ">

1
src/Squidex.Events/Squidex.Events.csproj

@ -13,5 +13,6 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="NodaTime" Version="2.0.3" /> <PackageReference Include="NodaTime" Version="2.0.3" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

3
src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.csproj

@ -7,7 +7,8 @@
<DebugSymbols>True</DebugSymbols> <DebugSymbols>True</DebugSymbols>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Google.Cloud.Storage.V1" Version="1.0.0" /> <PackageReference Include="Google.Cloud.Storage.V1" Version="2.0.0" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Squidex.Infrastructure\Squidex.Infrastructure.csproj" /> <ProjectReference Include="..\Squidex.Infrastructure\Squidex.Infrastructure.csproj" />

13
src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs

@ -10,7 +10,6 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using MongoDB.Bson; using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes; using MongoDB.Bson.Serialization.Attributes;
using NodaTime;
namespace Squidex.Infrastructure.MongoDb.EventStore namespace Squidex.Infrastructure.MongoDb.EventStore
{ {
@ -23,15 +22,11 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
[BsonRequired] [BsonRequired]
[BsonElement] [BsonElement]
public Instant Timestamp { get; set; } public BsonTimestamp Timestamp { get; set; }
[BsonElement] [BsonElement]
[BsonRequired] [BsonRequired]
public List<MongoEvent> Events { get; set; } public MongoEvent[] Events { get; set; }
[BsonElement]
[BsonRequired]
public long EventsOffset { get; set; }
[BsonElement] [BsonElement]
[BsonRequired] [BsonRequired]
@ -39,10 +34,10 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
[BsonElement] [BsonElement]
[BsonRequired] [BsonRequired]
public string EventStream { get; set; } public long EventsCount { get; set; }
[BsonElement] [BsonElement]
[BsonRequired] [BsonRequired]
public long EventsCount { get; set; } public string EventStream { get; set; }
} }
} }

164
src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs

@ -8,15 +8,12 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq; using System.Reactive.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MongoDB.Bson; using MongoDB.Bson;
using MongoDB.Driver; using MongoDB.Driver;
using NodaTime;
using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.Reflection;
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression // ReSharper disable ConvertIfStatementToConditionalTernaryExpression
// ReSharper disable ClassNeverInstantiated.Local // ReSharper disable ClassNeverInstantiated.Local
@ -28,18 +25,14 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore
{ {
private const int Retries = 500; private const int Retries = 500;
private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0);
private readonly IEventNotifier notifier; private readonly IEventNotifier notifier;
private readonly IClock clock;
private string eventsOffsetIndex;
public MongoEventStore(IMongoDatabase database, IEventNotifier notifier, IClock clock) public MongoEventStore(IMongoDatabase database, IEventNotifier notifier)
: base(database) : base(database)
{ {
Guard.NotNull(clock, nameof(clock));
Guard.NotNull(notifier, nameof(notifier)); Guard.NotNull(notifier, nameof(notifier));
this.clock = clock;
this.notifier = notifier; this.notifier = notifier;
} }
@ -55,17 +48,11 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
protected override async Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection) protected override async Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection)
{ {
var indexNames = await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Ascending(x => x.Timestamp));
await Task.WhenAll( await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true });
collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventsOffset), new CreateIndexOptions { Unique = true }),
collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }),
collection.Indexes.CreateOneAsync(Index.Descending(x => x.EventsOffset), new CreateIndexOptions { Unique = true }),
collection.Indexes.CreateOneAsync(Index.Descending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }));
eventsOffsetIndex = indexNames[0];
} }
public IObservable<StoredEvent> GetEventsAsync(string streamFilter, long lastReceivedEventNumber = -1) public IObservable<StoredEvent> GetEventsAsync(string streamFilter = null, string position = null)
{ {
return Observable.Create<StoredEvent>((observer, ct) => return Observable.Create<StoredEvent>((observer, ct) =>
{ {
@ -74,22 +61,29 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
observer.OnNext(storedEvent); observer.OnNext(storedEvent);
return Tasks.TaskHelper.Done; return Tasks.TaskHelper.Done;
}, ct, streamFilter, lastReceivedEventNumber); }, ct, streamFilter, position);
}); });
} }
public async Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, long lastReceivedEventNumber = -1) public async Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null)
{ {
Guard.NotNull(callback, nameof(callback)); Guard.NotNull(callback, nameof(callback));
var filters = new List<FilterDefinition<MongoEventCommit>>();
if (lastReceivedEventNumber >= 0) var tokenTimestamp = EmptyTimestamp;
var tokenEventStreamNumber = -1;
if (position != null)
{ {
var commitOffset = await GetPreviousOffsetAsync(lastReceivedEventNumber); var token = ParsePosition(position);
filters.Add(Filter.Gte(x => x.EventsOffset, commitOffset)); tokenTimestamp = token.Timestamp;
tokenEventStreamNumber = token.EventStreamNumber;
} }
var filters = new List<FilterDefinition<MongoEventCommit>>
{
Filter.Gte(x => x.Timestamp, tokenTimestamp)
};
if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, "*", StringComparison.OrdinalIgnoreCase)) if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, "*", StringComparison.OrdinalIgnoreCase))
{ {
@ -114,140 +108,106 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
filter = filters[0]; filter = filters[0];
} }
await Collection.Find(filter).SortBy(x => x.EventsOffset).ForEachAsync(async commit => await Collection.Find(filter).SortBy(x => x.Timestamp).ForEachAsync(async commit =>
{ {
var eventNumber = commit.EventsOffset; var eventStreamNumber = (int)commit.EventStreamOffset;
var eventStreamNumber = commit.EventStreamOffset;
foreach (var mongoEvent in commit.Events) foreach (var e in commit.Events)
{ {
eventNumber++;
eventStreamNumber++; eventStreamNumber++;
if (eventNumber > lastReceivedEventNumber) if (eventStreamNumber > tokenEventStreamNumber)
{ {
var eventData = SimpleMapper.Map(mongoEvent, new EventData()); var eventData = new EventData { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type };
var eventToken = CreateToken(commit.Timestamp, eventStreamNumber);
await callback(new StoredEvent(eventNumber, eventStreamNumber, eventData)); await callback(new StoredEvent(eventToken, eventStreamNumber, eventData));
} }
} }
}, cancellationToken); }, cancellationToken);
} }
public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable<EventData> events) public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events)
{ {
Guard.NotNullOrEmpty(streamName, nameof(streamName)); Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events)); Guard.NotNull(events, nameof(events));
var currentVersion = await GetEventStreamOffset(streamName); var eventsCount = events.Count;
if (currentVersion != expectedVersion) if (eventsCount > 0)
{ {
throw new WrongEventVersionException(currentVersion, expectedVersion); var commitEvents = new MongoEvent[events.Count];
}
var now = clock.GetCurrentInstant(); var i = 0;
var commitEvents = events.Select(x => SimpleMapper.Map(x, new MongoEvent())).ToList(); foreach (var e in events)
{
var mongoEvent = new MongoEvent { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type };
if (commitEvents.Any()) commitEvents[i++] = mongoEvent;
{ }
var offset = await GetEventOffsetAsync();
var commit = new MongoEventCommit var commit = new MongoEventCommit
{ {
Id = commitId, Id = commitId,
Events = commitEvents, Events = commitEvents,
EventsOffset = offset, EventsCount = eventsCount,
EventsCount = commitEvents.Count,
EventStream = streamName, EventStream = streamName,
EventStreamOffset = expectedVersion, EventStreamOffset = expectedVersion,
Timestamp = now Timestamp = EmptyTimestamp
}; };
try
{
await Collection.InsertOneAsync(commit);
for (var retry = 0; retry < Retries; retry++) notifier.NotifyEventsStored();
}
catch (MongoWriteException ex)
{ {
try if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
{ {
await Collection.InsertOneAsync(commit); var currentVersion = await GetEventStreamOffset(streamName);
notifier.NotifyEventsStored(); throw new WrongEventVersionException(currentVersion, expectedVersion);
return;
}
catch (MongoWriteException ex)
{
if (ex.Message.IndexOf(eventsOffsetIndex, StringComparison.OrdinalIgnoreCase) >= 0)
{
commit.EventsOffset = await GetEventOffsetAsync();
}
else if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
{
currentVersion = await GetEventStreamOffset(streamName);
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
else
{
throw;
}
} }
throw;
} }
} }
} }
private async Task<long> GetPreviousOffsetAsync(long startEventNumber) private async Task<long> GetEventStreamOffset(string streamName)
{ {
var document = var document =
await Collection.Find(x => x.EventsOffset <= startEventNumber) await Collection.Find(x => x.EventStream == streamName)
.Project<BsonDocument>(Project .Project<BsonDocument>(Project
.Include(x => x.EventsOffset)) .Include(x => x.EventStreamOffset)
.SortByDescending(x => x.EventsOffset).Limit(1) .Include(x => x.EventsCount))
.SortByDescending(x => x.EventStreamOffset).Limit(1)
.FirstOrDefaultAsync(); .FirstOrDefaultAsync();
if (document != null) if (document != null)
{ {
return document["EventsOffset"].ToInt64(); return document["EventStreamOffset"].ToInt64() + document["EventsCount"].ToInt64();
} }
return -1; return -1;
} }
private async Task<long> GetEventOffsetAsync() private static string CreateToken(BsonTimestamp timestamp, int eventStreamNumber)
{ {
var document = var parts = new object[] { timestamp.Timestamp, timestamp.Increment, eventStreamNumber };
await Collection.Find(new BsonDocument())
.Project<BsonDocument>(Project
.Include(x => x.EventsOffset)
.Include(x => x.EventsCount))
.SortByDescending(x => x.EventsOffset).Limit(1)
.FirstOrDefaultAsync();
if (document != null) return string.Join("$", parts);
{
return document["EventsOffset"].ToInt64() + document["EventsCount"].ToInt64();
}
return -1;
} }
private async Task<long> GetEventStreamOffset(string streamName) private static (BsonTimestamp Timestamp, int EventStreamNumber) ParsePosition(string position)
{ {
var document = var parts = position.Split('$');
await Collection.Find(x => x.EventStream == streamName)
.Project<BsonDocument>(Project
.Include(x => x.EventStreamOffset)
.Include(x => x.EventsCount))
.SortByDescending(x => x.EventsOffset).Limit(1)
.FirstOrDefaultAsync();
if (document != null)
{
return document["EventStreamOffset"].ToInt64() + document["EventsCount"].ToInt64();
}
return -1; return (new BsonTimestamp(int.Parse(parts[0]), int.Parse(parts[1])), int.Parse(parts[2]));
} }
} }
} }

2
src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs

@ -32,6 +32,6 @@ namespace Squidex.Infrastructure.MongoDb
[BsonElement] [BsonElement]
[BsonRequired] [BsonRequired]
public long LastHandledEventNumber { get; set; } public string Position { get; set; }
} }
} }

20
src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs

@ -47,7 +47,7 @@ namespace Squidex.Infrastructure.MongoDb
{ {
try try
{ {
await Collection.InsertOneAsync(new MongoEventConsumerInfo { Name = consumerName, LastHandledEventNumber = -1 }); await Collection.InsertOneAsync(new MongoEventConsumerInfo { Name = consumerName, Position = null });
} }
catch (MongoWriteException ex) catch (MongoWriteException ex)
{ {
@ -61,31 +61,27 @@ namespace Squidex.Infrastructure.MongoDb
public Task StartAsync(string consumerName) public Task StartAsync(string consumerName)
{ {
return Collection.UpdateOneAsync(x => x.Name == consumerName, return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Unset(x => x.IsStopped));
Update.Unset(x => x.IsStopped));
} }
public Task StopAsync(string consumerName, string error = null) public Task StopAsync(string consumerName, string error = null)
{ {
return Collection.UpdateOneAsync(x => x.Name == consumerName, return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsStopped, true).Set(x => x.Error, error));
Update.Set(x => x.IsStopped, true).Set(x => x.Error, error));
} }
public Task ResetAsync(string consumerName) public Task ResetAsync(string consumerName)
{ {
return Collection.UpdateOneAsync(x => x.Name == consumerName, return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsResetting, true));
Update.Set(x => x.IsResetting, true));
} }
public Task SetLastHandledEventNumberAsync(string consumerName, long eventNumber) public Task SetLastHandledEventNumberAsync(string consumerName, string position)
{ {
return Collection.ReplaceOneAsync(x => x.Name == consumerName, return Collection.ReplaceOneAsync(x => x.Name == consumerName, CreateEntity(consumerName, position));
CreateEntity(consumerName, eventNumber));
} }
private static MongoEventConsumerInfo CreateEntity(string consumerName, long eventNumber) private static MongoEventConsumerInfo CreateEntity(string consumerName, string position)
{ {
return new MongoEventConsumerInfo { Name = consumerName, LastHandledEventNumber = eventNumber }; return new MongoEventConsumerInfo { Name = consumerName, Position = position };
} }
} }
} }

1
src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj

@ -11,5 +11,6 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="MongoDB.Driver" Version="2.4.4" /> <PackageReference Include="MongoDB.Driver" Version="2.4.4" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

1
src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj

@ -8,6 +8,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="4.1.3" /> <PackageReference Include="RabbitMQ.Client" Version="4.1.3" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Squidex.Infrastructure\Squidex.Infrastructure.csproj" /> <ProjectReference Include="..\Squidex.Infrastructure\Squidex.Infrastructure.csproj" />

3
src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj

@ -10,6 +10,7 @@
<ProjectReference Include="..\Squidex.Infrastructure\Squidex.Infrastructure.csproj" /> <ProjectReference Include="..\Squidex.Infrastructure\Squidex.Infrastructure.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="StackExchange.Redis.StrongName" Version="1.2.3" /> <PackageReference Include="StackExchange.Redis.StrongName" Version="1.2.4" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

6
src/Squidex.Infrastructure/CQRS/Events/EnvelopeExtensions.cs

@ -14,12 +14,12 @@ namespace Squidex.Infrastructure.CQRS.Events
{ {
public static class EnvelopeExtensions public static class EnvelopeExtensions
{ {
public static long EventNumber(this EnvelopeHeaders headers) public static string EventPosition(this EnvelopeHeaders headers)
{ {
return headers[CommonHeaders.EventNumber].ToInt32(CultureInfo.InvariantCulture); return headers[CommonHeaders.EventNumber].ToString();
} }
public static Envelope<T> SetEventNumber<T>(this Envelope<T> envelope, long value) where T : class public static Envelope<T> SetEventPosition<T>(this Envelope<T> envelope, string value) where T : class
{ {
envelope.Headers.Set(CommonHeaders.EventNumber, value); envelope.Headers.Set(CommonHeaders.EventNumber, value);

30
src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs

@ -11,6 +11,7 @@ using System.Threading.Tasks;
using Squidex.Infrastructure.Log; using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Timers; using Squidex.Infrastructure.Timers;
// ReSharper disable ConvertToLambdaExpression
// ReSharper disable MethodSupportsCancellation // ReSharper disable MethodSupportsCancellation
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression // ReSharper disable ConvertIfStatementToConditionalTernaryExpression
// ReSharper disable InvertIf // ReSharper disable InvertIf
@ -97,21 +98,23 @@ namespace Squidex.Infrastructure.CQRS.Events
{ {
var status = await eventConsumerInfoRepository.FindAsync(consumerName); var status = await eventConsumerInfoRepository.FindAsync(consumerName);
var lastHandledEventNumber = status.LastHandledEventNumber; var position = status.Position;
if (status.IsResetting) if (status.IsResetting)
{ {
await ResetAsync(eventConsumer, consumerName); await ResetAsync(eventConsumer, consumerName);
lastHandledEventNumber = -1; position = null;
} }
else if (status.IsStopped) else if (status.IsStopped)
{ {
return; return;
} }
await eventStore.GetEventsAsync(se => HandleEventAsync(eventConsumer, se, consumerName), ct, await eventStore.GetEventsAsync(se =>
eventConsumer.EventsFilter, lastHandledEventNumber); {
return HandleEventAsync(eventConsumer, se, consumerName);
}, ct, eventConsumer.EventsFilter, position);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -133,8 +136,9 @@ namespace Squidex.Infrastructure.CQRS.Events
return; return;
} }
await DispatchConsumer(@event, eventConsumer); await DispatchConsumer(@event, eventConsumer, consumerName);
await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, storedEvent.EventNumber);
await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, storedEvent.EventPosition);
} }
private async Task ResetAsync(IEventConsumer eventConsumer, string consumerName) private async Task ResetAsync(IEventConsumer eventConsumer, string consumerName)
@ -149,7 +153,7 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("eventConsumer", eventConsumer.GetType().Name)); .WriteProperty("eventConsumer", eventConsumer.GetType().Name));
await eventConsumer.ClearAsync(); await eventConsumer.ClearAsync();
await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, -1); await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, null);
log.LogInformation(w => w log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset") .WriteProperty("action", "EventConsumerReset")
@ -169,7 +173,7 @@ namespace Squidex.Infrastructure.CQRS.Events
} }
} }
private async Task DispatchConsumer(Envelope<IEvent> @event, IEventConsumer eventConsumer) private async Task DispatchConsumer(Envelope<IEvent> @event, IEventConsumer eventConsumer, string consumerName)
{ {
var eventId = @event.Headers.EventId().ToString(); var eventId = @event.Headers.EventId().ToString();
var eventType = @event.Payload.GetType().Name; var eventType = @event.Payload.GetType().Name;
@ -181,7 +185,7 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("state", "Started") .WriteProperty("state", "Started")
.WriteProperty("eventId", eventId) .WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType) .WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.GetType().Name)); .WriteProperty("eventConsumer", consumerName));
await eventConsumer.On(@event); await eventConsumer.On(@event);
@ -191,7 +195,7 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("state", "Completed") .WriteProperty("state", "Completed")
.WriteProperty("eventId", eventId) .WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType) .WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.GetType().Name)); .WriteProperty("eventConsumer", consumerName));
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -201,7 +205,7 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("state", "Started") .WriteProperty("state", "Started")
.WriteProperty("eventId", eventId) .WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType) .WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.GetType().Name)); .WriteProperty("eventConsumer", consumerName));
throw; throw;
} }
@ -213,7 +217,7 @@ namespace Squidex.Infrastructure.CQRS.Events
{ {
var @event = formatter.Parse(storedEvent.Data); var @event = formatter.Parse(storedEvent.Data);
@event.SetEventNumber(storedEvent.EventNumber); @event.SetEventPosition(storedEvent.EventPosition);
@event.SetEventStreamNumber(storedEvent.EventStreamNumber); @event.SetEventStreamNumber(storedEvent.EventStreamNumber);
return @event; return @event;
@ -228,7 +232,7 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("action", "ParseEvent") .WriteProperty("action", "ParseEvent")
.WriteProperty("state", "Failed") .WriteProperty("state", "Failed")
.WriteProperty("eventId", storedEvent.Data.EventId.ToString()) .WriteProperty("eventId", storedEvent.Data.EventId.ToString())
.WriteProperty("eventNumber", storedEvent.EventNumber)); .WriteProperty("eventPosition", storedEvent.EventPosition));
throw; throw;
} }

4
src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs

@ -10,8 +10,6 @@ namespace Squidex.Infrastructure.CQRS.Events
{ {
public interface IEventConsumerInfo public interface IEventConsumerInfo
{ {
long LastHandledEventNumber { get; }
bool IsStopped { get; } bool IsStopped { get; }
bool IsResetting { get; } bool IsResetting { get; }
@ -19,5 +17,7 @@ namespace Squidex.Infrastructure.CQRS.Events
string Name { get; } string Name { get; }
string Error { get; } string Error { get; }
string Position { get; }
} }
} }

2
src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs

@ -25,6 +25,6 @@ namespace Squidex.Infrastructure.CQRS.Events
Task ResetAsync(string consumerName); Task ResetAsync(string consumerName);
Task SetLastHandledEventNumberAsync(string consumerName, long eventNumber); Task SetLastHandledEventNumberAsync(string consumerName, string position);
} }
} }

6
src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs

@ -15,10 +15,10 @@ namespace Squidex.Infrastructure.CQRS.Events
{ {
public interface IEventStore public interface IEventStore
{ {
IObservable<StoredEvent> GetEventsAsync(string streamFilter = null, long lastReceivedEventNumber = -1); IObservable<StoredEvent> GetEventsAsync(string streamFilter = null, string position = null);
Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, long lastReceivedEventNumber = -1); Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null);
Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable<EventData> events); Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events);
} }
} }

21
src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs

@ -10,31 +10,32 @@ namespace Squidex.Infrastructure.CQRS.Events
{ {
public sealed class StoredEvent public sealed class StoredEvent
{ {
private readonly long eventNumber; private readonly string eventPosition;
private readonly long eventStreamNumber; private readonly int eventStreamNumber;
private readonly EventData data; private readonly EventData data;
public long EventNumber public string EventPosition
{ {
get { return eventNumber; } get { return eventPosition; }
} }
public long EventStreamNumber public EventData Data
{ {
get { return eventStreamNumber; } get { return data; }
} }
public EventData Data public int EventStreamNumber
{ {
get { return data; } get { return eventStreamNumber; }
} }
public StoredEvent(long eventNumber, long eventStreamNumber, EventData data) public StoredEvent(string eventPosition, int eventStreamNumber, EventData data)
{ {
Guard.NotNullOrEmpty(eventPosition, nameof(eventPosition));
Guard.NotNull(data, nameof(data)); Guard.NotNull(data, nameof(data));
this.data = data; this.data = data;
this.eventNumber = eventNumber; this.eventPosition = eventPosition;
this.eventStreamNumber = eventStreamNumber; this.eventStreamNumber = eventStreamNumber;
} }
} }

3
src/Squidex.Infrastructure/Squidex.Infrastructure.csproj

@ -11,11 +11,12 @@
<PackageReference Include="ImageSharp" Version="1.0.0-alpha8-00049" /> <PackageReference Include="ImageSharp" Version="1.0.0-alpha8-00049" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="1.1.2" /> <PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="NodaTime" Version="2.0.3" /> <PackageReference Include="NodaTime" Version="2.0.3" />
<PackageReference Include="System.Linq" Version="4.3.0" /> <PackageReference Include="System.Linq" Version="4.3.0" />
<PackageReference Include="System.Reactive" Version="3.1.1" /> <PackageReference Include="System.Reactive" Version="3.1.1" />
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.3.0" /> <PackageReference Include="System.Reflection.TypeExtensions" Version="4.3.0" />
<PackageReference Include="System.Security.Claims" Version="4.3.0" /> <PackageReference Include="System.Security.Claims" Version="4.3.0" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

1
src/Squidex.Read.MongoDb/Squidex.Read.MongoDb.csproj

@ -19,6 +19,7 @@
<PackageReference Include="Microsoft.AspNetCore.Identity" Version="1.1.2" /> <PackageReference Include="Microsoft.AspNetCore.Identity" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Identity.MongoDB" Version="1.0.2" /> <PackageReference Include="Microsoft.AspNetCore.Identity.MongoDB" Version="1.0.2" />
<PackageReference Include="MongoDB.Driver" Version="2.4.4" /> <PackageReference Include="MongoDB.Driver" Version="2.4.4" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.6' "> <ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.6' ">
<PackageReference Include="Microsoft.OData.Core" Version="6.15.0" /> <PackageReference Include="Microsoft.OData.Core" Version="6.15.0" />

1
src/Squidex.Read/Squidex.Read.csproj

@ -18,5 +18,6 @@
<PackageReference Include="NodaTime" Version="2.0.3" /> <PackageReference Include="NodaTime" Version="2.0.3" />
<PackageReference Include="System.Linq.Queryable" Version="4.3.0" /> <PackageReference Include="System.Linq.Queryable" Version="4.3.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.7.0" /> <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.7.0" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

1
src/Squidex.Write/Squidex.Write.csproj

@ -15,5 +15,6 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="NodaTime" Version="2.0.3" /> <PackageReference Include="NodaTime" Version="2.0.3" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

9
src/Squidex/Squidex.csproj

@ -65,13 +65,14 @@
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="1.1.2" /> <PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="1.1.2" /> <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="1.1.2" />
<PackageReference Include="MongoDB.Driver" Version="2.4.4" /> <PackageReference Include="MongoDB.Driver" Version="2.4.4" />
<PackageReference Include="NJsonSchema" Version="9.1.11" /> <PackageReference Include="NJsonSchema" Version="9.2.5" />
<PackageReference Include="NodaTime.Serialization.JsonNet" Version="2.0.0" /> <PackageReference Include="NodaTime.Serialization.JsonNet" Version="2.0.0" />
<PackageReference Include="NSwag.AspNetCore" Version="11.0.0" /> <PackageReference Include="NSwag.AspNetCore" Version="11.2.0" />
<PackageReference Include="OpenCover" Version="4.6.519" /> <PackageReference Include="OpenCover" Version="4.6.519" />
<PackageReference Include="ReportGenerator" Version="2.5.8" /> <PackageReference Include="ReportGenerator" Version="2.5.9" />
<PackageReference Include="StackExchange.Redis.StrongName" Version="1.2.3" /> <PackageReference Include="StackExchange.Redis.StrongName" Version="1.2.4" />
<PackageReference Include="System.Linq" Version="4.3.0" /> <PackageReference Include="System.Linq" Version="4.3.0" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'netcoreapp1.1' "> <ItemGroup Condition=" '$(TargetFramework)' == 'netcoreapp1.1' ">

1
tests/Benchmarks/Benchmarks.csproj

@ -9,5 +9,6 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="MongoDB.Driver" Version="2.4.4" /> <PackageReference Include="MongoDB.Driver" Version="2.4.4" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

1
tests/Benchmarks/IBenchmark.cs

@ -5,7 +5,6 @@
// Copyright (c) Squidex Group // Copyright (c) Squidex Group
// All rights reserved. // All rights reserved.
// ========================================================================== // ==========================================================================
namespace Benchmarks namespace Benchmarks
{ {
public interface IBenchmark public interface IBenchmark

13
tests/Benchmarks/Program.cs

@ -1,4 +1,12 @@
using System; // ==========================================================================
// Program.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
@ -10,7 +18,8 @@ namespace Benchmarks
{ {
private static readonly List<IBenchmark> Benchmarks = new List<IBenchmark> private static readonly List<IBenchmark> Benchmarks = new List<IBenchmark>
{ {
new AppendToEventStore() new AppendToEventStore(),
new AppendToEventStoreParallel()
}; };
public static void Main(string[] args) public static void Main(string[] args)

2
tests/Benchmarks/Properties/launchSettings.json

@ -2,7 +2,7 @@
"profiles": { "profiles": {
"Benchmarks": { "Benchmarks": {
"commandName": "Project", "commandName": "Project",
"commandLineArgs": "appendToEventStore" "commandLineArgs": "appendToEventStoreParallel"
} }
} }
} }

17
tests/Benchmarks/Tests/AppendToEventStore.cs

@ -7,8 +7,8 @@
// ========================================================================== // ==========================================================================
using System; using System;
using Benchmarks.Utils;
using MongoDB.Driver; using MongoDB.Driver;
using NodaTime;
using Squidex.Infrastructure; using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.MongoDb.EventStore; using Squidex.Infrastructure.MongoDb.EventStore;
@ -20,14 +20,6 @@ namespace Benchmarks.Tests
private IMongoClient mongoClient; private IMongoClient mongoClient;
private IMongoDatabase mongoDatabase; private IMongoDatabase mongoDatabase;
private static readonly EventData EventData = new EventData
{
EventId = Guid.NewGuid(),
Metadata = "EventMetdata",
Payload = "EventPayload",
Type = "MyEvent"
};
public string Id public string Id
{ {
get { return "appendToEventStore"; } get { return "appendToEventStore"; }
@ -46,15 +38,14 @@ namespace Benchmarks.Tests
public void RunInitialize() public void RunInitialize()
{ {
mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString()); mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString());
mongoDatabase.CreateCollection("Test");
} }
public long Run() public long Run()
{ {
const long numCommits = 10; const long numCommits = 200;
const long eventStreams = 10; const long eventStreams = 10;
var eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()), SystemClock.Instance); var eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()));
for (var streamId = 0; streamId < eventStreams; streamId++) for (var streamId = 0; streamId < eventStreams; streamId++)
{ {
@ -63,7 +54,7 @@ namespace Benchmarks.Tests
for (var commitId = 0; commitId < numCommits; commitId++) for (var commitId = 0; commitId < numCommits; commitId++)
{ {
eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { EventData }).Wait(); eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { Helper.CreateEventData() }).Wait();
eventOffset++; eventOffset++;
} }

76
tests/Benchmarks/Tests/AppendToEventStoreParallel.cs

@ -0,0 +1,76 @@
// ==========================================================================
// AppendToEventStoreParallel.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Benchmarks.Utils;
using MongoDB.Driver;
using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.MongoDb.EventStore;
namespace Benchmarks.Tests
{
public sealed class AppendToEventStoreParallel : IBenchmark
{
private IMongoClient mongoClient;
private IMongoDatabase mongoDatabase;
public string Id
{
get { return "appendToEventStoreParallel"; }
}
public string Name
{
get { return "Append Events to EventStore Parallel"; }
}
public void Initialize()
{
mongoClient = new MongoClient("mongodb://localhost");
}
public void RunInitialize()
{
mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString());
}
public long Run()
{
const long numCommits = 200;
const long eventStreams = 10;
var eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()));
Parallel.For(0, eventStreams, streamId =>
{
var eventOffset = -1;
var streamName = streamId.ToString();
for (var commitId = 0; commitId < numCommits; commitId++)
{
eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { Helper.CreateEventData() }).Wait();
eventOffset++;
}
});
return numCommits * eventStreams;
}
public void RunCleanup()
{
mongoClient.DropDatabase(mongoDatabase.DatabaseNamespace.DatabaseName);
}
public void Cleanup()
{
}
}
}

22
tests/Benchmarks/Utils/Helper.cs

@ -0,0 +1,22 @@
// ==========================================================================
// Helper.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using Squidex.Infrastructure.CQRS.Events;
namespace Benchmarks.Utils
{
public static class Helper
{
public static EventData CreateEventData()
{
return new EventData { EventId = Guid.NewGuid(), Metadata = "EventMetdata", Payload = "EventPayload", Type = "MyEvent" };
}
}
}

2
tests/RunCoverage.ps1

@ -50,6 +50,6 @@ New-Item -ItemType directory -Path $reportsFolder
-output:"$workingFolder\$reportsFolder\Read.xml" ` -output:"$workingFolder\$reportsFolder\Read.xml" `
-oldStyle -oldStyle
&"$userProfile\.nuget\packages\ReportGenerator\2.5.8\tools\ReportGenerator.exe" ` &"$userProfile\.nuget\packages\ReportGenerator\2.5.9\tools\ReportGenerator.exe" `
-reports:"$workingFolder\$reportsFolder\*.xml" ` -reports:"$workingFolder\$reportsFolder\*.xml" `
-targetdir:"$workingFolder\$reportsFolder\Output" -targetdir:"$workingFolder\$reportsFolder\Output"

5
tests/Squidex.Core.Tests/Squidex.Core.Tests.csproj

@ -10,9 +10,10 @@
<ProjectReference Include="..\..\src\Squidex.Infrastructure\Squidex.Infrastructure.csproj" /> <ProjectReference Include="..\..\src\Squidex.Infrastructure\Squidex.Infrastructure.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="FluentAssertions" Version="4.19.2" /> <PackageReference Include="FluentAssertions" Version="4.19.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="Moq" Version="4.7.25" /> <PackageReference Include="Moq" Version="4.7.63" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
<PackageReference Include="xunit" Version="2.2.0" /> <PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
</ItemGroup> </ItemGroup>

16
tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs

@ -74,7 +74,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
[Fact] [Fact]
public async Task Should_throw_exception_when_event_store_returns_no_events() public async Task Should_throw_exception_when_event_store_returns_no_events()
{ {
eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(Observable.Empty<StoredEvent>()); eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(Observable.Empty<StoredEvent>());
await Assert.ThrowsAsync<DomainObjectNotFoundException>(() => sut.GetByIdAsync<MyDomainObject>(aggregateId)); await Assert.ThrowsAsync<DomainObjectNotFoundException>(() => sut.GetByIdAsync<MyDomainObject>(aggregateId));
} }
@ -90,11 +90,11 @@ namespace Squidex.Infrastructure.CQRS.Commands
var events = new[] var events = new[]
{ {
new StoredEvent(0, 0, eventData1), new StoredEvent("0", 0, eventData1),
new StoredEvent(1, 1, eventData2) new StoredEvent("1", 1, eventData2)
}; };
eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(events.ToObservable()); eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(events.ToObservable());
eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope<IEvent>(event1)); eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope<IEvent>(event1));
eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope<IEvent>(event2)); eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope<IEvent>(event2));
@ -115,11 +115,11 @@ namespace Squidex.Infrastructure.CQRS.Commands
var events = new[] var events = new[]
{ {
new StoredEvent(0, 0, eventData1), new StoredEvent("0", 0, eventData1),
new StoredEvent(1, 1, eventData2) new StoredEvent("1", 1, eventData2)
}; };
eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(events.ToObservable()); eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(events.ToObservable());
eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope<IEvent>(event1)); eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope<IEvent>(event1));
eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope<IEvent>(event2)); eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope<IEvent>(event2));
@ -141,7 +141,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
eventDataFormatter.Setup(x => x.ToEventData(It.Is<Envelope<IEvent>>(e => e.Payload == event1), commitId)).Returns(eventData1); eventDataFormatter.Setup(x => x.ToEventData(It.Is<Envelope<IEvent>>(e => e.Payload == event1), commitId)).Returns(eventData1);
eventDataFormatter.Setup(x => x.ToEventData(It.Is<Envelope<IEvent>>(e => e.Payload == event2), commitId)).Returns(eventData2); eventDataFormatter.Setup(x => x.ToEventData(It.Is<Envelope<IEvent>>(e => e.Payload == event2), commitId)).Returns(eventData2);
eventStore.Setup(x => x.AppendEventsAsync(commitId, streamName, 123, It.Is<IEnumerable<EventData>>(e => e.Count() == 2))) eventStore.Setup(x => x.AppendEventsAsync(commitId, streamName, 123, It.Is<ICollection<EventData>>(e => e.Count() == 2)))
.Returns(TaskHelper.Done) .Returns(TaskHelper.Done)
.Verifiable(); .Verifiable();

8
tests/Squidex.Infrastructure.Tests/CQRS/Events/EnvelopeExtensionsTests.cs

@ -65,12 +65,12 @@ namespace Squidex.Infrastructure.CQRS.Events
[Fact] [Fact]
public void Should_set_and_get_event_number() public void Should_set_and_get_event_number()
{ {
const int eventNumber = 123; const string eventNumber = "123";
sut.SetEventNumber(eventNumber); sut.SetEventPosition(eventNumber);
Assert.Equal(eventNumber, sut.Headers.EventNumber()); Assert.Equal(eventNumber, sut.Headers.EventPosition());
Assert.Equal(eventNumber, sut.Headers["EventNumber"].ToInt32(culture)); Assert.Equal(eventNumber, sut.Headers["EventNumber"].ToString());
} }
[Fact] [Fact]

2
tests/Squidex.Infrastructure.Tests/CQRS/Events/EventDataFormatterTests.cs

@ -41,7 +41,7 @@ namespace Squidex.Infrastructure.CQRS.Events
inputEvent.SetAggregateId(Guid.NewGuid()); inputEvent.SetAggregateId(Guid.NewGuid());
inputEvent.SetCommitId(commitId); inputEvent.SetCommitId(commitId);
inputEvent.SetEventId(Guid.NewGuid()); inputEvent.SetEventId(Guid.NewGuid());
inputEvent.SetEventNumber(1); inputEvent.SetEventPosition("1");
inputEvent.SetEventStreamNumber(1); inputEvent.SetEventStreamNumber(1);
inputEvent.SetTimestamp(SystemClock.Instance.GetCurrentInstant()); inputEvent.SetTimestamp(SystemClock.Instance.GetCurrentInstant());

26
tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

@ -27,13 +27,15 @@ namespace Squidex.Infrastructure.CQRS.Events
private sealed class MyEventConsumerInfo : IEventConsumerInfo private sealed class MyEventConsumerInfo : IEventConsumerInfo
{ {
public long LastHandledEventNumber { get; set; }
public bool IsStopped { get; set; } public bool IsStopped { get; set; }
public bool IsResetting { get; set; } public bool IsResetting { get; set; }
public string Name { get; set; } public string Name { get; set; }
public string Error { get; set; } public string Error { get; set; }
public string Position { get; set; }
} }
private sealed class MyEventStore : IEventStore private sealed class MyEventStore : IEventStore
@ -45,7 +47,7 @@ namespace Squidex.Infrastructure.CQRS.Events
this.storedEvents = storedEvents; this.storedEvents = storedEvents;
} }
public async Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, long lastReceivedEventNumber = -1) public async Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null)
{ {
foreach (var @event in storedEvents) foreach (var @event in storedEvents)
{ {
@ -53,12 +55,12 @@ namespace Squidex.Infrastructure.CQRS.Events
} }
} }
public IObservable<StoredEvent> GetEventsAsync(string streamFilter = null, long lastReceivedEventNumber = -1) public IObservable<StoredEvent> GetEventsAsync(string streamFilter = null, string position = null)
{ {
throw new NotSupportedException(); throw new NotSupportedException();
} }
public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable<EventData> events) public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events)
{ {
throw new NotSupportedException(); throw new NotSupportedException();
} }
@ -83,9 +85,9 @@ namespace Squidex.Infrastructure.CQRS.Events
{ {
var events = new[] var events = new[]
{ {
new StoredEvent(3, 3, eventData1), new StoredEvent("3", 3, eventData1),
new StoredEvent(4, 4, eventData2), new StoredEvent("4", 4, eventData2),
new StoredEvent(5, 5, eventData3) new StoredEvent("5", 5, eventData3)
}; };
consumerName = eventConsumer.Object.GetType().Name; consumerName = eventConsumer.Object.GetType().Name;
@ -116,7 +118,7 @@ namespace Squidex.Infrastructure.CQRS.Events
[Fact] [Fact]
public void Should_subscribe_to_consumer_and_handle_events() public void Should_subscribe_to_consumer_and_handle_events()
{ {
consumerInfo.LastHandledEventNumber = 2L; consumerInfo.Position = "2";
sut.Subscribe(eventConsumer.Object); sut.Subscribe(eventConsumer.Object);
sut.Next(); sut.Next();
@ -130,7 +132,7 @@ namespace Squidex.Infrastructure.CQRS.Events
[Fact] [Fact]
public void Should_abort_if_handling_failed() public void Should_abort_if_handling_failed()
{ {
consumerInfo.LastHandledEventNumber = 2L; consumerInfo.Position = "2";
eventConsumer.Setup(x => x.On(envelope1)).Returns(TaskHelper.True); eventConsumer.Setup(x => x.On(envelope1)).Returns(TaskHelper.True);
eventConsumer.Setup(x => x.On(envelope2)).Throws(new InvalidOperationException()); eventConsumer.Setup(x => x.On(envelope2)).Throws(new InvalidOperationException());
@ -149,7 +151,7 @@ namespace Squidex.Infrastructure.CQRS.Events
[Fact] [Fact]
public void Should_abort_if_serialization_failed() public void Should_abort_if_serialization_failed()
{ {
consumerInfo.LastHandledEventNumber = 2L; consumerInfo.Position = "2";
formatter.Setup(x => x.Parse(eventData2)).Throws(new InvalidOperationException()); formatter.Setup(x => x.Parse(eventData2)).Throws(new InvalidOperationException());
@ -168,7 +170,7 @@ namespace Squidex.Infrastructure.CQRS.Events
public void Should_reset_if_requested() public void Should_reset_if_requested()
{ {
consumerInfo.IsResetting = true; consumerInfo.IsResetting = true;
consumerInfo.LastHandledEventNumber = 2L; consumerInfo.Position = "2";
sut.Subscribe(eventConsumer.Object); sut.Subscribe(eventConsumer.Object);
sut.Next(); sut.Next();

5
tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj

@ -8,11 +8,12 @@
<ProjectReference Include="..\..\src\Squidex.Infrastructure\Squidex.Infrastructure.csproj" /> <ProjectReference Include="..\..\src\Squidex.Infrastructure\Squidex.Infrastructure.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="FluentAssertions" Version="4.19.2" /> <PackageReference Include="FluentAssertions" Version="4.19.3" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="1.1.2" /> <PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="1.1.2" /> <PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="1.1.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="Moq" Version="4.7.25" /> <PackageReference Include="Moq" Version="4.7.63" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
<PackageReference Include="xunit" Version="2.2.0" /> <PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
</ItemGroup> </ItemGroup>

5
tests/Squidex.Read.Tests/Squidex.Read.Tests.csproj

@ -12,10 +12,11 @@
<ProjectReference Include="..\..\src\Squidex.Read.MongoDb\Squidex.Read.MongoDb.csproj" /> <ProjectReference Include="..\..\src\Squidex.Read.MongoDb\Squidex.Read.MongoDb.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="FluentAssertions" Version="4.19.2" /> <PackageReference Include="FluentAssertions" Version="4.19.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="MongoDB.Driver" Version="2.4.4" /> <PackageReference Include="MongoDB.Driver" Version="2.4.4" />
<PackageReference Include="Moq" Version="4.7.25" /> <PackageReference Include="Moq" Version="4.7.63" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
<PackageReference Include="xunit" Version="2.2.0" /> <PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
</ItemGroup> </ItemGroup>

5
tests/Squidex.Write.Tests/Squidex.Write.Tests.csproj

@ -11,10 +11,11 @@
<ProjectReference Include="..\..\src\Squidex.Write\Squidex.Write.csproj" /> <ProjectReference Include="..\..\src\Squidex.Write\Squidex.Write.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="FluentAssertions" Version="4.19.2" /> <PackageReference Include="FluentAssertions" Version="4.19.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="MongoDB.Driver" Version="2.4.4" /> <PackageReference Include="MongoDB.Driver" Version="2.4.4" />
<PackageReference Include="Moq" Version="4.7.25" /> <PackageReference Include="Moq" Version="4.7.63" />
<PackageReference Include="System.ValueTuple" Version="4.3.1" />
<PackageReference Include="xunit" Version="2.2.0" /> <PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
</ItemGroup> </ItemGroup>

9
tools/Migrate_01/Migrate_01.csproj

@ -0,0 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp1.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MongoDB.Driver" Version="2.4.4" />
</ItemGroup>
</Project>

94
tools/Migrate_01/Program.cs

@ -0,0 +1,94 @@
// ==========================================================================
// Program.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using MongoDB.Bson;
using MongoDB.Driver;
namespace Migrate_01
{
public class Program
{
public static void Main(string[] args)
{
Console.WriteLine("Migrate EventStore");
var mongoClient = new MongoClient(GetMongoConnectionValue());
var mongoDatabase = mongoClient.GetDatabase(GetMongoDatabaseValue());
var collection = mongoDatabase.GetCollection<BsonDocument>("Events");
Console.Write("Migrate Indices.....");
collection.Indexes.DropAll();
Console.WriteLine("DONE");
var query =
collection.Find(new BsonDocument())
.Project<BsonDocument>(
Builders<BsonDocument>.Projection.Include(Field("EventsOffset")))
.ToList();
Console.Write("Migrate Documents...");
foreach (var eventCommit in query)
{
var eventsOffset = (int)eventCommit["EventsOffset"].AsInt64;
var ts = new BsonTimestamp(eventsOffset + 10, 1);
collection.UpdateOne(
Builders<BsonDocument>.Filter
.Eq(Field<string>("_id"), eventCommit["_id"].AsString),
Builders<BsonDocument>.Update
.Set(Field<BsonTimestamp>("Timestamp"), ts).Unset(Field("EventsOffset")));
}
Console.WriteLine("DONE");
}
private static StringFieldDefinition<BsonDocument, T> Field<T>(string fieldName)
{
return new StringFieldDefinition<BsonDocument, T>(fieldName);
}
private static StringFieldDefinition<BsonDocument> Field(string fieldName)
{
return new StringFieldDefinition<BsonDocument>(fieldName);
}
private static string GetMongoConnectionValue()
{
Console.Write("Mongo Connection (ENTER for 'mongodb://localhost'): ");
var mongoConnection = Console.ReadLine();
if (string.IsNullOrWhiteSpace(mongoConnection))
{
mongoConnection = "mongodb://localhost";
}
return mongoConnection;
}
private static string GetMongoDatabaseValue()
{
Console.Write("Mongo Database (ENTER for 'Squidex'): ");
var mongoDatabase = Console.ReadLine();
if (string.IsNullOrWhiteSpace(mongoDatabase))
{
mongoDatabase = "Squidex";
}
return mongoDatabase;
}
}
}
Loading…
Cancel
Save