mirror of https://github.com/Squidex/squidex.git
committed by
GitHub
25 changed files with 481 additions and 290 deletions
@ -0,0 +1,145 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using MongoDB.Driver; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Entities.Apps; |
|||
using Squidex.Domain.Apps.Entities.Schemas; |
|||
using Squidex.Domain.Apps.Events; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.EventSourcing; |
|||
using Squidex.Infrastructure.MongoDb; |
|||
using Squidex.Infrastructure.ObjectPool; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.MongoDb.Schemas |
|||
{ |
|||
public sealed class MongoSchemasHash : MongoRepositoryBase<MongoSchemasHashEntity>, ISchemasHash, IEventConsumer |
|||
{ |
|||
public int BatchSize |
|||
{ |
|||
get { return 1000; } |
|||
} |
|||
|
|||
public int BatchDelay |
|||
{ |
|||
get { return 500; } |
|||
} |
|||
|
|||
public string Name |
|||
{ |
|||
get { return GetType().Name; } |
|||
} |
|||
|
|||
public string EventsFilter |
|||
{ |
|||
get { return "^(app-|schema-)"; } |
|||
} |
|||
|
|||
public MongoSchemasHash(IMongoDatabase database, bool setup = false) |
|||
: base(database, setup) |
|||
{ |
|||
} |
|||
|
|||
protected override string CollectionName() |
|||
{ |
|||
return "SchemasHash"; |
|||
} |
|||
|
|||
public Task On(IEnumerable<Envelope<IEvent>> events) |
|||
{ |
|||
var writes = new List<WriteModel<MongoSchemasHashEntity>>(); |
|||
|
|||
foreach (var @event in events) |
|||
{ |
|||
switch (@event.Payload) |
|||
{ |
|||
case SchemaEvent schemaEvent: |
|||
{ |
|||
writes.Add( |
|||
new UpdateOneModel<MongoSchemasHashEntity>( |
|||
Filter.Eq(x => x.AppId, schemaEvent.AppId.Id.ToString()), |
|||
Update |
|||
.Set($"s.{schemaEvent.SchemaId.Id}", @event.Headers.EventStreamNumber()) |
|||
.Set(x => x.Updated, @event.Headers.Timestamp()))); |
|||
break; |
|||
} |
|||
|
|||
case AppEvent appEvent: |
|||
writes.Add( |
|||
new UpdateOneModel<MongoSchemasHashEntity>( |
|||
Filter.Eq(x => x.AppId, appEvent.AppId.Id.ToString()), |
|||
Update |
|||
.Set(x => x.AppVersion, @event.Headers.EventStreamNumber()) |
|||
.Set(x => x.Updated, @event.Headers.Timestamp())) |
|||
{ |
|||
IsUpsert = true |
|||
}); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
if (writes.Count == 0) |
|||
{ |
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
return Collection.BulkWriteAsync(writes); |
|||
} |
|||
|
|||
public async Task<(Instant Create, string Hash)> GetCurrentHashAsync(DomainId appId) |
|||
{ |
|||
var entity = await Collection.Find(x => x.AppId == appId.ToString()).FirstOrDefaultAsync(); |
|||
|
|||
if (entity == null) |
|||
{ |
|||
return (default, string.Empty); |
|||
} |
|||
|
|||
var ids = |
|||
entity.SchemaVersions.Select(x => (x.Key, x.Value)) |
|||
.Union(Enumerable.Repeat((entity.AppId, entity.AppVersion), 1)); |
|||
|
|||
var hash = CreateHash(ids); |
|||
|
|||
return (entity.Updated, hash); |
|||
} |
|||
|
|||
public ValueTask<string> ComputeHashAsync(IAppEntity app, IEnumerable<ISchemaEntity> schemas) |
|||
{ |
|||
var ids = |
|||
schemas.Select(x => (x.Id.ToString(), x.Version)) |
|||
.Union(Enumerable.Repeat((app.Id.ToString(), app.Version), 1)); |
|||
|
|||
var hash = CreateHash(ids); |
|||
|
|||
return new ValueTask<string>(hash); |
|||
} |
|||
|
|||
private static string CreateHash(IEnumerable<(string, long)> ids) |
|||
{ |
|||
var sb = DefaultPools.StringBuilder.Get(); |
|||
try |
|||
{ |
|||
foreach (var (id, version) in ids.OrderBy(x => x.Item1)) |
|||
{ |
|||
sb.Append(id); |
|||
sb.Append(version); |
|||
sb.Append(';'); |
|||
} |
|||
|
|||
return sb.ToString().Sha256Base64(); |
|||
} |
|||
finally |
|||
{ |
|||
DefaultPools.StringBuilder.Return(sb); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,32 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Collections.Generic; |
|||
using MongoDB.Bson.Serialization.Attributes; |
|||
using NodaTime; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.MongoDb.Schemas |
|||
{ |
|||
public sealed class MongoSchemasHashEntity |
|||
{ |
|||
[BsonId] |
|||
[BsonElement] |
|||
public string AppId { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement("v")] |
|||
public long AppVersion { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement("s")] |
|||
public Dictionary<string, long> SchemaVersions { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement("t")] |
|||
public Instant Updated { get; set; } |
|||
} |
|||
} |
|||
@ -0,0 +1,22 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Collections.Generic; |
|||
using System.Threading.Tasks; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Entities.Apps; |
|||
using Squidex.Infrastructure; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Schemas |
|||
{ |
|||
public interface ISchemasHash |
|||
{ |
|||
Task<(Instant Create, string Hash)> GetCurrentHashAsync(DomainId appId); |
|||
|
|||
ValueTask<string> ComputeHashAsync(IAppEntity app, IEnumerable<ISchemaEntity> schemas); |
|||
} |
|||
} |
|||
@ -1,29 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Threading; |
|||
|
|||
namespace Squidex.Infrastructure.Tasks |
|||
{ |
|||
public sealed class AsyncLocalCleaner<T> : IDisposable |
|||
{ |
|||
private readonly AsyncLocal<T> asyncLocal; |
|||
|
|||
public AsyncLocalCleaner(AsyncLocal<T> asyncLocal) |
|||
{ |
|||
Guard.NotNull(asyncLocal, nameof(asyncLocal)); |
|||
|
|||
this.asyncLocal = asyncLocal; |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
asyncLocal.Value = default!; |
|||
} |
|||
} |
|||
} |
|||
@ -1,73 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschränkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
#pragma warning disable RECS0022 // A catch clause that catches System.Exception and has an empty body
|
|||
|
|||
namespace Squidex.Infrastructure.Tasks |
|||
{ |
|||
public sealed class AsyncLock |
|||
{ |
|||
private readonly SemaphoreSlim semaphore; |
|||
|
|||
public AsyncLock() |
|||
{ |
|||
semaphore = new SemaphoreSlim(1); |
|||
} |
|||
|
|||
public Task<IDisposable> LockAsync() |
|||
{ |
|||
var wait = semaphore.WaitAsync(); |
|||
|
|||
if (wait.IsCompleted) |
|||
{ |
|||
return Task.FromResult((IDisposable)new LockReleaser(this)); |
|||
} |
|||
else |
|||
{ |
|||
return wait.ContinueWith(x => (IDisposable)new LockReleaser(this), |
|||
CancellationToken.None, |
|||
TaskContinuationOptions.ExecuteSynchronously, |
|||
TaskScheduler.Default); |
|||
} |
|||
} |
|||
|
|||
private class LockReleaser : IDisposable |
|||
{ |
|||
private AsyncLock? target; |
|||
|
|||
internal LockReleaser(AsyncLock target) |
|||
{ |
|||
this.target = target; |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
var current = target; |
|||
|
|||
if (current == null) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
target = null; |
|||
|
|||
try |
|||
{ |
|||
current.semaphore.Release(); |
|||
} |
|||
catch |
|||
{ |
|||
// just ignore the Exception
|
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -1,36 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschränkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Squidex.Infrastructure.Tasks |
|||
{ |
|||
public sealed class AsyncLockPool |
|||
{ |
|||
private readonly AsyncLock[] locks; |
|||
|
|||
public AsyncLockPool(int poolSize) |
|||
{ |
|||
Guard.GreaterThan(poolSize, 0, nameof(poolSize)); |
|||
|
|||
locks = new AsyncLock[poolSize]; |
|||
|
|||
for (var i = 0; i < poolSize; i++) |
|||
{ |
|||
locks[i] = new AsyncLock(); |
|||
} |
|||
} |
|||
|
|||
public Task<IDisposable> LockAsync(object target) |
|||
{ |
|||
Guard.NotNull(target, nameof(target)); |
|||
|
|||
return locks[Math.Abs(target.GetHashCode() % locks.Length)].LockAsync(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,38 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Threading.Tasks; |
|||
using MongoDB.Driver; |
|||
using Squidex.Domain.Apps.Entities.MongoDb.Schemas; |
|||
using Squidex.Infrastructure.MongoDb; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Schemas.MongoDb |
|||
{ |
|||
public sealed class SchemasHashFixture |
|||
{ |
|||
private readonly IMongoClient mongoClient = new MongoClient("mongodb://localhost"); |
|||
private readonly IMongoDatabase mongoDatabase; |
|||
|
|||
public MongoSchemasHash SchemasHash { get; } |
|||
|
|||
public SchemasHashFixture() |
|||
{ |
|||
InstantSerializer.Register(); |
|||
|
|||
mongoDatabase = mongoClient.GetDatabase("QueryTests"); |
|||
|
|||
var schemasHash = new MongoSchemasHash(mongoDatabase); |
|||
|
|||
Task.Run(async () => |
|||
{ |
|||
await schemasHash.InitializeAsync(); |
|||
}).Wait(); |
|||
|
|||
SchemasHash = schemasHash; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,110 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Threading.Tasks; |
|||
using FakeItEasy; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Entities.Apps; |
|||
using Squidex.Domain.Apps.Events.Apps; |
|||
using Squidex.Domain.Apps.Events.Schemas; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.EventSourcing; |
|||
using Xunit; |
|||
|
|||
#pragma warning disable SA1300 // Element should begin with upper-case letter
|
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Schemas.MongoDb |
|||
{ |
|||
[Trait("Category", "Dependencies")] |
|||
public class SchemasHashTests : IClassFixture<SchemasHashFixture> |
|||
{ |
|||
public SchemasHashFixture _ { get; } |
|||
|
|||
public SchemasHashTests(SchemasHashFixture fixture) |
|||
{ |
|||
_ = fixture; |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_compute_cache_independent_from_order() |
|||
{ |
|||
var app = CreateApp(DomainId.NewGuid(), 1); |
|||
|
|||
var schema1 = CreateSchema(DomainId.NewGuid(), 2); |
|||
var schema2 = CreateSchema(DomainId.NewGuid(), 3); |
|||
|
|||
var hash1 = await _.SchemasHash.ComputeHashAsync(app, new[] { schema1, schema2 }); |
|||
var hash2 = await _.SchemasHash.ComputeHashAsync(app, new[] { schema2, schema1 }); |
|||
|
|||
Assert.NotNull(hash1); |
|||
Assert.NotNull(hash2); |
|||
Assert.Equal(hash1, hash2); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_compute_cache_independent_from_db() |
|||
{ |
|||
var app = CreateApp(DomainId.NewGuid(), 1); |
|||
|
|||
var schema1 = CreateSchema(DomainId.NewGuid(), 2); |
|||
var schema2 = CreateSchema(DomainId.NewGuid(), 3); |
|||
|
|||
var timestamp = SystemClock.Instance.GetCurrentInstant().WithoutMs(); |
|||
|
|||
var computedHash = await _.SchemasHash.ComputeHashAsync(app, new[] { schema1, schema2 }); |
|||
|
|||
await _.SchemasHash.On(new[] |
|||
{ |
|||
Envelope.Create<IEvent>(new AppCreated |
|||
{ |
|||
AppId = NamedId.Of(app.Id, "my-app") |
|||
}).SetEventStreamNumber(app.Version).SetTimestamp(timestamp), |
|||
|
|||
Envelope.Create<IEvent>(new SchemaCreated |
|||
{ |
|||
AppId = NamedId.Of(app.Id, "my-app"), |
|||
SchemaId = NamedId.Of(schema1.Id, "my-schema") |
|||
}).SetEventStreamNumber(schema1.Version).SetTimestamp(timestamp), |
|||
|
|||
Envelope.Create<IEvent>(new SchemaCreated |
|||
{ |
|||
AppId = NamedId.Of(app.Id, "my-app"), |
|||
SchemaId = NamedId.Of(schema2.Id, "my-schema") |
|||
}).SetEventStreamNumber(schema2.Version).SetTimestamp(timestamp) |
|||
}); |
|||
|
|||
var (dbTime, dbHash) = await _.SchemasHash.GetCurrentHashAsync(app.Id); |
|||
|
|||
Assert.Equal(dbHash, computedHash); |
|||
Assert.Equal(dbTime, timestamp); |
|||
} |
|||
|
|||
private static IAppEntity CreateApp(DomainId id, long version) |
|||
{ |
|||
var app = A.Fake<IAppEntity>(); |
|||
|
|||
A.CallTo(() => app.Id) |
|||
.Returns(id); |
|||
A.CallTo(() => app.Version) |
|||
.Returns(version); |
|||
|
|||
return app; |
|||
} |
|||
|
|||
private static ISchemaEntity CreateSchema(DomainId id, long version) |
|||
{ |
|||
var schema = A.Fake<ISchemaEntity>(); |
|||
|
|||
A.CallTo(() => schema.Id) |
|||
.Returns(id); |
|||
A.CallTo(() => schema.Version) |
|||
.Returns(version); |
|||
|
|||
return schema; |
|||
} |
|||
} |
|||
} |
|||
@ -1,38 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using Xunit; |
|||
|
|||
namespace Squidex.Infrastructure.Tasks |
|||
{ |
|||
public class AsyncLockPoolTests |
|||
{ |
|||
[Fact] |
|||
public async Task Should_lock() |
|||
{ |
|||
var sut = new AsyncLockPool(1); |
|||
|
|||
var value = 0; |
|||
|
|||
await Task.WhenAll( |
|||
Enumerable.Repeat(0, 100).Select(x => new Func<Task>(async () => |
|||
{ |
|||
using (await sut.LockAsync(1)) |
|||
{ |
|||
await Task.Yield(); |
|||
|
|||
value++; |
|||
} |
|||
})())); |
|||
|
|||
Assert.Equal(100, value); |
|||
} |
|||
} |
|||
} |
|||
@ -1,38 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using Xunit; |
|||
|
|||
namespace Squidex.Infrastructure.Tasks |
|||
{ |
|||
public class AsyncLockTests |
|||
{ |
|||
[Fact] |
|||
public async Task Should_lock() |
|||
{ |
|||
var sut = new AsyncLock(); |
|||
|
|||
var value = 0; |
|||
|
|||
await Task.WhenAll( |
|||
Enumerable.Repeat(0, 100).Select(x => new Func<Task>(async () => |
|||
{ |
|||
using (await sut.LockAsync()) |
|||
{ |
|||
await Task.Yield(); |
|||
|
|||
value++; |
|||
} |
|||
})())); |
|||
|
|||
Assert.Equal(100, value); |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue