Browse Source

Memory optimizations. (#403)

pull/406/head
Sebastian Stehle 7 years ago
committed by GitHub
parent
commit
74f4e58fa0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs
  2. 2
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs
  3. 8
      src/Squidex.Domain.Apps.Entities/Assets/AssetGrain.cs
  4. 8
      src/Squidex.Domain.Apps.Entities/Contents/ContentGrain.cs
  5. 3
      src/Squidex.Domain.Apps.Entities/Context.cs
  6. 2
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
  7. 23
      src/Squidex.Infrastructure.MongoDb/MongoDb/Batching.cs
  8. 11
      src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs
  9. 2
      src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs
  10. 50
      tools/LoadTest/ContentCreationBenchmarks.cs
  11. 71
      tools/LoadTest/ContentQueryBenchmarks.cs
  12. 73
      tools/LoadTest/Run.cs

2
src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs

@ -55,7 +55,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
{
using (Profiler.TraceMethod<MongoAssetRepository>())
{
await Collection.Find(new BsonDocument()).ForEachPipelineAsync(x => callback(Map(x), x.Version), ct);
await Collection.Find(new BsonDocument(), options: Batching.Options).ForEachPipelineAsync(x => callback(Map(x), x.Version), ct);
}
}

2
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs

@ -209,7 +209,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
public Task ReadAllAsync(Func<ContentState, long, Task> callback, Func<Guid, Guid, Task<ISchemaEntity>> getSchema, CancellationToken ct = default)
{
return Collection.Find(new BsonDocument()).ForEachPipelineAsync(async contentEntity =>
return Collection.Find(new BsonDocument(), options: Batching.Options).ForEachPipelineAsync(async contentEntity =>
{
var schema = await getSchema(contentEntity.IndexedAppId, contentEntity.IndexedSchemaId);

8
src/Squidex.Domain.Apps.Entities/Assets/AssetGrain.cs

@ -26,6 +26,7 @@ namespace Squidex.Domain.Apps.Entities.Assets
{
public sealed class AssetGrain : SquidexDomainObjectGrainLogSnapshots<AssetState>, IAssetGrain
{
private static readonly TimeSpan Lifetime = TimeSpan.FromMinutes(5);
private readonly ITagService tagService;
public AssetGrain(IStore<Guid> store, ITagService tagService, ISemanticLog log)
@ -36,6 +37,13 @@ namespace Squidex.Domain.Apps.Entities.Assets
this.tagService = tagService;
}
public override Task OnActivateAsync()
{
DelayDeactivation(Lifetime);
return base.OnActivateAsync();
}
protected override Task<object> ExecuteAsync(IAggregateCommand command)
{
VerifyNotDeleted();

8
src/Squidex.Domain.Apps.Entities/Contents/ContentGrain.cs

@ -28,6 +28,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
{
public sealed class ContentGrain : SquidexDomainObjectGrainLogSnapshots<ContentState>, IContentGrain
{
private static readonly TimeSpan Lifetime = TimeSpan.FromMinutes(5);
private readonly IAppProvider appProvider;
private readonly IAssetRepository assetRepository;
private readonly IContentRepository contentRepository;
@ -57,6 +58,13 @@ namespace Squidex.Domain.Apps.Entities.Contents
this.contentRepository = contentRepository;
}
public override Task OnActivateAsync()
{
DelayDeactivation(Lifetime);
return base.OnActivateAsync();
}
protected override Task<object> ExecuteAsync(IAggregateCommand command)
{
VerifyNotDeleted();

3
src/Squidex.Domain.Apps.Entities/Context.cs

@ -13,6 +13,7 @@ using Squidex.Infrastructure;
using Squidex.Infrastructure.Security;
using Squidex.Shared;
using Squidex.Shared.Identity;
using ClaimsPermissions = Squidex.Infrastructure.Security.PermissionSet;
namespace Squidex.Domain.Apps.Entities
{
@ -24,7 +25,7 @@ namespace Squidex.Domain.Apps.Entities
public ClaimsPrincipal User { get; }
public PermissionSet Permissions { get; private set; } = PermissionSet.Empty;
public ClaimsPermissions Permissions { get; private set; } = ClaimsPermissions.Empty;
public bool IsFrontendClient { get; private set; }

2
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs

@ -106,7 +106,7 @@ namespace Squidex.Infrastructure.EventSourcing
{
using (Profiler.TraceMethod<MongoEventStore>())
{
await Collection.Find(filterDefinition).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit =>
await Collection.Find(filterDefinition, options: Batching.Options).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit =>
{
var eventStreamOffset = (int)commit.EventStreamOffset;

23
src/Squidex.Infrastructure.MongoDb/MongoDb/Batching.cs

@ -0,0 +1,23 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using MongoDB.Driver;
namespace Squidex.Infrastructure.MongoDb
{
public static class Batching
{
public const int BufferSize = 100;
public const int Size = BufferSize * 2;
public static readonly FindOptions Options = new FindOptions
{
BatchSize = Size
};
}
}

11
src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs

@ -161,9 +161,10 @@ namespace Squidex.Infrastructure.MongoDb
public static async Task ForEachPipelineAsync<TDocument>(this IAsyncCursorSource<TDocument> source, Func<TDocument, Task> processor, CancellationToken cancellationToken = default)
{
var cursor = await source.ToCursorAsync(cancellationToken);
await cursor.ForEachPipelineAsync(processor, cancellationToken);
using (var cursor = await source.ToCursorAsync(cancellationToken))
{
await cursor.ForEachPipelineAsync(processor, cancellationToken);
}
}
public static async Task ForEachPipelineAsync<TDocument>(this IAsyncCursor<TDocument> source, Func<TDocument, Task> processor, CancellationToken cancellationToken = default)
@ -184,12 +185,14 @@ namespace Squidex.Infrastructure.MongoDb
{
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1,
BoundedCapacity = 100
BoundedCapacity = Batching.BufferSize
});
try
{
await source.ForEachAsync(async i =>
{
var t = source;
if (!await actionBlock.SendAsync(i, combined.Token))
{
selfToken.Cancel();

2
src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs

@ -65,7 +65,7 @@ namespace Squidex.Infrastructure.States
{
using (Profiler.TraceMethod<MongoSnapshotStore<T, TKey>>())
{
await Collection.Find(new BsonDocument()).ForEachPipelineAsync(x => callback(x.Doc, x.Version), ct);
await Collection.Find(new BsonDocument(), options: Batching.Options).ForEachPipelineAsync(x => callback(x.Doc, x.Version), ct);
}
}

50
tools/LoadTest/ContentCreationBenchmarks.cs

@ -0,0 +1,50 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Xunit;
namespace LoadTest
{
public class ContentCreationBenchmarks : IClassFixture<ClientQueryFixture>
{
public ClientQueryFixture Fixture { get; }
public ContentCreationBenchmarks(ClientQueryFixture fixture)
{
Fixture = fixture;
}
public static IEnumerable<object[]> Loads()
{
int[] users = { 1, 5, 10, 20, 50, 100 };
int[] loads = { 5, 10, 20, 50, 100 };
foreach (var user in users)
{
foreach (var load in loads)
{
yield return new object[] { user, load };
}
}
}
[Theory]
[MemberData(nameof(Loads))]
public async Task Should_create_items(int numUsers, int numIterationsPerUser)
{
var random = new Random();
await Run.Parallel(numUsers, numIterationsPerUser, async () =>
{
await Fixture.Client.CreateAsync(new TestEntityData { Value = random.Next() }, true);
});
}
}
}

71
tools/LoadTest/QueryBenchmarks.cs → tools/LoadTest/ContentQueryBenchmarks.cs

@ -5,23 +5,18 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Squidex.ClientLibrary;
using Xunit;
namespace LoadTest
{
public class QueryBenchmarks : IClassFixture<ClientQueryFixture>
public class ContentQueryBenchmarks : IClassFixture<ClientQueryFixture>
{
public ClientQueryFixture Fixture { get; }
public QueryBenchmarks(ClientQueryFixture fixture)
public ContentQueryBenchmarks(ClientQueryFixture fixture)
{
Fixture = fixture;
}
@ -44,7 +39,7 @@ namespace LoadTest
[MemberData(nameof(Loads))]
public async Task Should_return_all(int numUsers, int numIterationsPerUser)
{
await Run(numUsers, numIterationsPerUser, async () =>
await Run.Parallel(numUsers, numIterationsPerUser, async () =>
{
await Fixture.Client.GetAsync(new ODataQuery { OrderBy = "data/value/iv asc" });
});
@ -54,7 +49,7 @@ namespace LoadTest
[MemberData(nameof(Loads))]
public async Task Should_return_items_with_skip(int numUsers, int numIterationsPerUser)
{
await Run(numUsers, numIterationsPerUser, async () =>
await Run.Parallel(numUsers, numIterationsPerUser, async () =>
{
await Fixture.Client.GetAsync(new ODataQuery { Skip = 5, OrderBy = "data/value/iv asc" });
});
@ -64,7 +59,7 @@ namespace LoadTest
[MemberData(nameof(Loads))]
public async Task Should_return_items_with_skip_and_top(int numUsers, int numIterationsPerUser)
{
await Run(numUsers, numIterationsPerUser, async () =>
await Run.Parallel(numUsers, numIterationsPerUser, async () =>
{
await Fixture.Client.GetAsync(new ODataQuery { Skip = 2, Top = 5, OrderBy = "data/value/iv asc" });
});
@ -74,7 +69,7 @@ namespace LoadTest
[MemberData(nameof(Loads))]
public async Task Should_return_items_with_ordering(int numUsers, int numIterationsPerUser)
{
await Run(numUsers, numIterationsPerUser, async () =>
await Run.Parallel(numUsers, numIterationsPerUser, async () =>
{
await Fixture.Client.GetAsync(new ODataQuery { Skip = 2, Top = 5, OrderBy = "data/value/iv desc" });
});
@ -84,62 +79,10 @@ namespace LoadTest
[MemberData(nameof(Loads))]
public async Task Should_return_items_with_filter(int numUsers, int numIterationsPerUser)
{
await Run(numUsers, numIterationsPerUser, async () =>
await Run.Parallel(numUsers, numIterationsPerUser, async () =>
{
await Fixture.Client.GetAsync(new ODataQuery { Filter = "data/value/iv gt 3 and data/value/iv lt 7", OrderBy = "data/value/iv asc" });
});
}
private static async Task Run(int numUsers, int numIterationsPerUser, Func<Task> action, int expectedAvg = 100)
{
var elapsedMs = new ConcurrentBag<long>();
var errors = 0;
async Task RunAsync()
{
for (var i = 0; i < numIterationsPerUser; i++)
{
try
{
var watch = Stopwatch.StartNew();
await action();
watch.Stop();
elapsedMs.Add(watch.ElapsedMilliseconds);
}
catch
{
Interlocked.Increment(ref errors);
}
}
}
var tasks = new List<Task>();
for (var i = 0; i < numUsers; i++)
{
tasks.Add(Task.Run(RunAsync));
}
await Task.WhenAll(tasks);
var count = elapsedMs.Count;
var max = elapsedMs.Max();
var min = elapsedMs.Min();
var avg = elapsedMs.Average();
Assert.Equal(0, errors);
Assert.Equal(count, numUsers * numIterationsPerUser);
Assert.InRange(max, 0, expectedAvg * 10);
Assert.InRange(min, 0, expectedAvg);
Assert.InRange(avg, 0, expectedAvg);
}
}
}

73
tools/LoadTest/Run.cs

@ -0,0 +1,73 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace LoadTest
{
public static class Run
{
public static async Task Parallel(int numUsers, int numIterationsPerUser, Func<Task> action, int expectedAvg = 100)
{
var elapsedMs = new ConcurrentBag<long>();
var errors = 0;
async Task RunAsync()
{
for (var i = 0; i < numIterationsPerUser; i++)
{
try
{
var watch = Stopwatch.StartNew();
await action();
watch.Stop();
elapsedMs.Add(watch.ElapsedMilliseconds);
}
catch
{
Interlocked.Increment(ref errors);
}
}
}
var tasks = new List<Task>();
for (var i = 0; i < numUsers; i++)
{
tasks.Add(Task.Run(RunAsync));
}
await Task.WhenAll(tasks);
var count = elapsedMs.Count;
var max = elapsedMs.Max();
var min = elapsedMs.Min();
var avg = elapsedMs.Average();
Assert.Equal(0, errors);
Assert.Equal(count, numUsers * numIterationsPerUser);
Assert.InRange(max, 0, expectedAvg * 10);
Assert.InRange(min, 0, expectedAvg);
Assert.InRange(avg, 0, expectedAvg);
}
}
}
Loading…
Cancel
Save