mirror of https://github.com/Squidex/squidex.git
committed by
GitHub
18 changed files with 557 additions and 208 deletions
@ -0,0 +1,33 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System.Collections.Generic; |
||||
|
using MongoDB.Bson; |
||||
|
using MongoDB.Bson.Serialization.Attributes; |
||||
|
using NodaTime; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.Log |
||||
|
{ |
||||
|
public sealed class MongoRequest |
||||
|
{ |
||||
|
[BsonId] |
||||
|
[BsonElement] |
||||
|
public ObjectId Id { get; set; } |
||||
|
|
||||
|
[BsonElement] |
||||
|
[BsonRequired] |
||||
|
public string Key { get; set; } |
||||
|
|
||||
|
[BsonElement] |
||||
|
[BsonRequired] |
||||
|
public Instant Timestamp { get; set; } |
||||
|
|
||||
|
[BsonElement] |
||||
|
[BsonRequired] |
||||
|
public Dictionary<string, string> Properties { get; set; } |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,77 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
using MongoDB.Driver; |
||||
|
using NodaTime; |
||||
|
using Squidex.Infrastructure.Log.Store; |
||||
|
using Squidex.Infrastructure.MongoDb; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.Log |
||||
|
{ |
||||
|
public sealed class MongoRequestLogRepository : MongoRepositoryBase<MongoRequest>, IRequestLogRepository |
||||
|
{ |
||||
|
private static readonly InsertManyOptions Unordered = new InsertManyOptions { IsOrdered = false }; |
||||
|
|
||||
|
public MongoRequestLogRepository(IMongoDatabase database) |
||||
|
: base(database) |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
protected override string CollectionName() |
||||
|
{ |
||||
|
return "RequestLog"; |
||||
|
} |
||||
|
|
||||
|
protected override Task SetupCollectionAsync(IMongoCollection<MongoRequest> collection, CancellationToken ct = default) |
||||
|
{ |
||||
|
return collection.Indexes.CreateManyAsync(new[] |
||||
|
{ |
||||
|
new CreateIndexModel<MongoRequest>( |
||||
|
Index |
||||
|
.Ascending(x => x.Key) |
||||
|
.Ascending(x => x.Timestamp)), |
||||
|
new CreateIndexModel<MongoRequest>( |
||||
|
Index |
||||
|
.Ascending(x => x.Timestamp), |
||||
|
new CreateIndexOptions |
||||
|
{ |
||||
|
ExpireAfter = TimeSpan.FromDays(90) |
||||
|
}), |
||||
|
}, ct); |
||||
|
} |
||||
|
|
||||
|
public Task InsertManyAsync(IEnumerable<Request> items) |
||||
|
{ |
||||
|
Guard.NotNull(items); |
||||
|
|
||||
|
var documents = items.Select(x => new MongoRequest { Key = x.Key, Timestamp = x.Timestamp, Properties = x.Properties }); |
||||
|
|
||||
|
return Collection.InsertManyAsync(documents, Unordered); |
||||
|
} |
||||
|
|
||||
|
public Task QueryAllAsync(Func<Request, Task> callback, string key, DateTime fromDate, DateTime toDate, CancellationToken ct = default) |
||||
|
{ |
||||
|
Guard.NotNull(callback); |
||||
|
Guard.NotNullOrEmpty(key); |
||||
|
|
||||
|
var timestampStart = Instant.FromDateTimeUtc(fromDate); |
||||
|
var timestampEnd = Instant.FromDateTimeUtc(toDate.AddDays(1)); |
||||
|
|
||||
|
return Collection.Find(x => x.Key == key && x.Timestamp >= timestampStart && x.Timestamp < timestampEnd).ForEachAsync(x => |
||||
|
{ |
||||
|
var request = new Request { Key = x.Key, Timestamp = x.Timestamp, Properties = x.Properties }; |
||||
|
|
||||
|
return callback(request); |
||||
|
}, ct); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -1,83 +0,0 @@ |
|||||
// ==========================================================================
|
|
||||
// Squidex Headless CMS
|
|
||||
// ==========================================================================
|
|
||||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|
||||
// All rights reserved. Licensed under the MIT license.
|
|
||||
// ==========================================================================
|
|
||||
|
|
||||
using System; |
|
||||
using System.IO; |
|
||||
using System.Text; |
|
||||
using System.Threading; |
|
||||
using System.Threading.Tasks; |
|
||||
using Orleans; |
|
||||
using Squidex.Infrastructure.Orleans; |
|
||||
|
|
||||
namespace Squidex.Infrastructure.Log |
|
||||
{ |
|
||||
public sealed class LockingLogStore : ILogStore |
|
||||
{ |
|
||||
private static readonly byte[] LockedText = Encoding.UTF8.GetBytes("Another process is currenty running, try it again later."); |
|
||||
private static readonly TimeSpan LockWaitingTime = TimeSpan.FromMinutes(1); |
|
||||
private readonly ILogStore inner; |
|
||||
private readonly ILockGrain lockGrain; |
|
||||
|
|
||||
public LockingLogStore(ILogStore inner, IGrainFactory grainFactory) |
|
||||
{ |
|
||||
Guard.NotNull(inner); |
|
||||
Guard.NotNull(grainFactory); |
|
||||
|
|
||||
this.inner = inner; |
|
||||
|
|
||||
lockGrain = grainFactory.GetGrain<ILockGrain>(SingleGrain.Id); |
|
||||
} |
|
||||
|
|
||||
public Task ReadLogAsync(string key, DateTime from, DateTime to, Stream stream) |
|
||||
{ |
|
||||
return ReadLogAsync(key, from, to, stream, LockWaitingTime); |
|
||||
} |
|
||||
|
|
||||
public async Task ReadLogAsync(string key, DateTime from, DateTime to, Stream stream, TimeSpan lockTimeout) |
|
||||
{ |
|
||||
using (var cts = new CancellationTokenSource(lockTimeout)) |
|
||||
{ |
|
||||
string? releaseToken = null; |
|
||||
|
|
||||
while (!cts.IsCancellationRequested) |
|
||||
{ |
|
||||
releaseToken = await lockGrain.AcquireLockAsync(key); |
|
||||
|
|
||||
if (releaseToken != null) |
|
||||
{ |
|
||||
break; |
|
||||
} |
|
||||
|
|
||||
try |
|
||||
{ |
|
||||
await Task.Delay(2000, cts.Token); |
|
||||
} |
|
||||
catch (OperationCanceledException) |
|
||||
{ |
|
||||
break; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
if (releaseToken != null) |
|
||||
{ |
|
||||
try |
|
||||
{ |
|
||||
await inner.ReadLogAsync(key, from, to, stream); |
|
||||
} |
|
||||
finally |
|
||||
{ |
|
||||
await lockGrain.ReleaseLockAsync(releaseToken); |
|
||||
} |
|
||||
} |
|
||||
else |
|
||||
{ |
|
||||
await stream.WriteAsync(LockedText, 0, LockedText.Length); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -0,0 +1,91 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Concurrent; |
||||
|
using System.Linq; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
using Squidex.Infrastructure.Tasks; |
||||
|
using Squidex.Infrastructure.Timers; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.Log.Store |
||||
|
{ |
||||
|
public sealed class BackgroundRequestLogStore : DisposableObjectBase, IRequestLogStore |
||||
|
{ |
||||
|
private const int Intervall = 10 * 1000; |
||||
|
private const int BatchSize = 1000; |
||||
|
private readonly IRequestLogRepository logRepository; |
||||
|
private readonly ISemanticLog log; |
||||
|
private readonly CompletionTimer timer; |
||||
|
private ConcurrentQueue<Request> jobs = new ConcurrentQueue<Request>(); |
||||
|
|
||||
|
public BackgroundRequestLogStore(IRequestLogRepository logRepository, ISemanticLog log) |
||||
|
{ |
||||
|
Guard.NotNull(logRepository); |
||||
|
Guard.NotNull(log); |
||||
|
|
||||
|
this.logRepository = logRepository; |
||||
|
this.log = log; |
||||
|
|
||||
|
timer = new CompletionTimer(Intervall, ct => TrackAsync(), Intervall); |
||||
|
} |
||||
|
|
||||
|
protected override void DisposeObject(bool disposing) |
||||
|
{ |
||||
|
if (disposing) |
||||
|
{ |
||||
|
timer.StopAsync().Wait(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void Next() |
||||
|
{ |
||||
|
ThrowIfDisposed(); |
||||
|
|
||||
|
timer.SkipCurrentDelay(); |
||||
|
} |
||||
|
|
||||
|
private async Task TrackAsync() |
||||
|
{ |
||||
|
try |
||||
|
{ |
||||
|
var localJobs = Interlocked.Exchange(ref jobs, new ConcurrentQueue<Request>()); |
||||
|
|
||||
|
if (localJobs.Count > 0) |
||||
|
{ |
||||
|
var pages = (int)Math.Ceiling((double)localJobs.Count / BatchSize); |
||||
|
|
||||
|
for (var i = 0; i < pages; i++) |
||||
|
{ |
||||
|
await logRepository.InsertManyAsync(localJobs.Skip(i * BatchSize).Take(BatchSize)); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
catch (Exception ex) |
||||
|
{ |
||||
|
log.LogError(ex, w => w |
||||
|
.WriteProperty("action", "TrackUsage") |
||||
|
.WriteProperty("status", "Failed")); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public Task QueryAllAsync(Func<Request, Task> callback, string key, DateTime fromDate, DateTime toDate, CancellationToken ct = default) |
||||
|
{ |
||||
|
return logRepository.QueryAllAsync(callback, key, fromDate, toDate, ct); |
||||
|
} |
||||
|
|
||||
|
public Task LogAsync(Request request) |
||||
|
{ |
||||
|
Guard.NotNull(request); |
||||
|
|
||||
|
jobs.Enqueue(request); |
||||
|
|
||||
|
return TaskHelper.Done; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,23 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System.Collections.Generic; |
||||
|
using NodaTime; |
||||
|
|
||||
|
#pragma warning disable SA1401 // Fields should be private
|
||||
|
|
||||
|
namespace Squidex.Infrastructure.Log.Store |
||||
|
{ |
||||
|
public sealed class Request |
||||
|
{ |
||||
|
public Instant Timestamp; |
||||
|
|
||||
|
public string Key; |
||||
|
|
||||
|
public Dictionary<string, string> Properties; |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,98 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.IO; |
||||
|
using System.Threading.Tasks; |
||||
|
using FakeItEasy; |
||||
|
using Squidex.Infrastructure.Log.Store; |
||||
|
using Xunit; |
||||
|
|
||||
|
namespace Squidex.Domain.Apps.Entities.Apps |
||||
|
{ |
||||
|
public class DefaultAppLogStoreTests |
||||
|
{ |
||||
|
private readonly IRequestLogStore requestLogStore = A.Fake<IRequestLogStore>(); |
||||
|
private readonly DefaultAppLogStore sut; |
||||
|
|
||||
|
public DefaultAppLogStoreTests() |
||||
|
{ |
||||
|
sut = new DefaultAppLogStore(requestLogStore); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task Should_forward_request_log_to_store() |
||||
|
{ |
||||
|
Request? recordedRequest = null; |
||||
|
|
||||
|
A.CallTo(() => requestLogStore.LogAsync(A<Request>.Ignored)) |
||||
|
.Invokes((Request request) => recordedRequest = request); |
||||
|
|
||||
|
var clientId = "frontend"; |
||||
|
var costs = 2; |
||||
|
var elapsedMs = 120; |
||||
|
var requestMethod = "GET"; |
||||
|
var requestPath = "/my-path"; |
||||
|
var userId = "user1"; |
||||
|
|
||||
|
await sut.LogAsync(Guid.NewGuid(), default, requestMethod, requestPath, userId, clientId, elapsedMs, costs); |
||||
|
|
||||
|
Assert.NotNull(recordedRequest); |
||||
|
|
||||
|
Assert.Contains(clientId, recordedRequest!.Properties.Values); |
||||
|
Assert.Contains(costs.ToString(), recordedRequest!.Properties.Values); |
||||
|
Assert.Contains(elapsedMs.ToString(), recordedRequest!.Properties.Values); |
||||
|
Assert.Contains(requestMethod, recordedRequest!.Properties.Values); |
||||
|
Assert.Contains(requestPath, recordedRequest!.Properties.Values); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task Should_create_some_stream() |
||||
|
{ |
||||
|
var dateFrom = DateTime.UtcNow.Date.AddDays(-30); |
||||
|
var dateTo = DateTime.UtcNow.Date; |
||||
|
|
||||
|
var appId = Guid.NewGuid(); |
||||
|
|
||||
|
A.CallTo(() => requestLogStore.QueryAllAsync(A<Func<Request, Task>>.Ignored, appId.ToString(), dateFrom, dateTo, default)) |
||||
|
.Invokes(x => |
||||
|
{ |
||||
|
var callback = x.GetArgument<Func<Request, Task>>(0); |
||||
|
|
||||
|
callback(CreateRecord()); |
||||
|
callback(CreateRecord()); |
||||
|
callback(CreateRecord()); |
||||
|
callback(CreateRecord()); |
||||
|
}); |
||||
|
|
||||
|
var stream = new MemoryStream(); |
||||
|
|
||||
|
await sut.ReadLogAsync(appId, dateFrom, dateTo, stream); |
||||
|
|
||||
|
stream.Position = 0; |
||||
|
|
||||
|
var lines = 0; |
||||
|
|
||||
|
using (var reader = new StreamReader(stream)) |
||||
|
{ |
||||
|
string? line = null; |
||||
|
while ((line = reader.ReadLine()) != null) |
||||
|
{ |
||||
|
lines++; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
Assert.Equal(5, lines); |
||||
|
} |
||||
|
|
||||
|
private static Request CreateRecord() |
||||
|
{ |
||||
|
return new Request { Properties = new Dictionary<string, string>() }; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -1,87 +0,0 @@ |
|||||
// ==========================================================================
|
|
||||
// Squidex Headless CMS
|
|
||||
// ==========================================================================
|
|
||||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|
||||
// All rights reserved. Licensed under the MIT license.
|
|
||||
// ==========================================================================
|
|
||||
|
|
||||
using System; |
|
||||
using System.IO; |
|
||||
using System.Threading.Tasks; |
|
||||
using FakeItEasy; |
|
||||
using Orleans; |
|
||||
using Squidex.Infrastructure.Orleans; |
|
||||
using Xunit; |
|
||||
|
|
||||
namespace Squidex.Infrastructure.Log |
|
||||
{ |
|
||||
public class LockingLogStoreTests |
|
||||
{ |
|
||||
private readonly IGrainFactory grainFactory = A.Fake<IGrainFactory>(); |
|
||||
private readonly ILockGrain lockGrain = A.Fake<ILockGrain>(); |
|
||||
private readonly ILogStore inner = A.Fake<ILogStore>(); |
|
||||
private readonly LockingLogStore sut; |
|
||||
|
|
||||
public LockingLogStoreTests() |
|
||||
{ |
|
||||
A.CallTo(() => grainFactory.GetGrain<ILockGrain>(SingleGrain.Id, null)) |
|
||||
.Returns(lockGrain); |
|
||||
|
|
||||
sut = new LockingLogStore(inner, grainFactory); |
|
||||
} |
|
||||
|
|
||||
[Fact] |
|
||||
public async Task Should_lock_and_call_inner() |
|
||||
{ |
|
||||
var stream = new MemoryStream(); |
|
||||
|
|
||||
var dateFrom = DateTime.Today; |
|
||||
var dateTo = dateFrom.AddDays(2); |
|
||||
|
|
||||
var key = "MyKey"; |
|
||||
|
|
||||
var releaseToken = Guid.NewGuid().ToString(); |
|
||||
|
|
||||
A.CallTo(() => lockGrain.AcquireLockAsync(key)) |
|
||||
.Returns(releaseToken); |
|
||||
|
|
||||
await sut.ReadLogAsync(key, dateFrom, dateTo, stream); |
|
||||
|
|
||||
A.CallTo(() => lockGrain.AcquireLockAsync(key)) |
|
||||
.MustHaveHappened(); |
|
||||
|
|
||||
A.CallTo(() => lockGrain.ReleaseLockAsync(releaseToken)) |
|
||||
.MustHaveHappened(); |
|
||||
|
|
||||
A.CallTo(() => inner.ReadLogAsync(key, dateFrom, dateTo, stream)) |
|
||||
.MustHaveHappened(); |
|
||||
} |
|
||||
|
|
||||
[Fact] |
|
||||
public async Task Should_write_default_message_if_lock_could_not_be_acquired() |
|
||||
{ |
|
||||
var stream = new MemoryStream(); |
|
||||
|
|
||||
var dateFrom = DateTime.Today; |
|
||||
var dateTo = dateFrom.AddDays(2); |
|
||||
|
|
||||
var key = "MyKey"; |
|
||||
|
|
||||
A.CallTo(() => lockGrain.AcquireLockAsync(key)) |
|
||||
.Returns(Task.FromResult<string?>(null)); |
|
||||
|
|
||||
await sut.ReadLogAsync(key, dateFrom, dateTo, stream, TimeSpan.FromSeconds(1)); |
|
||||
|
|
||||
A.CallTo(() => lockGrain.AcquireLockAsync(key)) |
|
||||
.MustHaveHappened(); |
|
||||
|
|
||||
A.CallTo(() => lockGrain.ReleaseLockAsync(A<string>.Ignored)) |
|
||||
.MustNotHaveHappened(); |
|
||||
|
|
||||
A.CallTo(() => inner.ReadLogAsync(A<string>.Ignored, A<DateTime>.Ignored, A<DateTime>.Ignored, A<Stream>.Ignored)) |
|
||||
.MustNotHaveHappened(); |
|
||||
|
|
||||
Assert.True(stream.Length > 0); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -0,0 +1,52 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using System.Threading.Tasks; |
||||
|
using FakeItEasy; |
||||
|
using Xunit; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.Log.Store |
||||
|
{ |
||||
|
public class BackgroundRequestLogStoreTests |
||||
|
{ |
||||
|
private readonly IRequestLogRepository requestLogRepository = A.Fake<IRequestLogRepository>(); |
||||
|
private readonly BackgroundRequestLogStore sut; |
||||
|
|
||||
|
public BackgroundRequestLogStoreTests() |
||||
|
{ |
||||
|
sut = new BackgroundRequestLogStore(requestLogRepository, A.Fake<ISemanticLog>()); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task Should_log_in_batches() |
||||
|
{ |
||||
|
for (var i = 0; i < 2500; i++) |
||||
|
{ |
||||
|
await sut.LogAsync(new Request { Key = i.ToString() }); |
||||
|
} |
||||
|
|
||||
|
sut.Next(); |
||||
|
sut.Dispose(); |
||||
|
|
||||
|
A.CallTo(() => requestLogRepository.InsertManyAsync(Batch("0", "999"))) |
||||
|
.MustHaveHappened(); |
||||
|
|
||||
|
A.CallTo(() => requestLogRepository.InsertManyAsync(Batch("1000", "1999"))) |
||||
|
.MustHaveHappened(); |
||||
|
|
||||
|
A.CallTo(() => requestLogRepository.InsertManyAsync(Batch("2000", "2499"))) |
||||
|
.MustHaveHappened(); |
||||
|
} |
||||
|
|
||||
|
private static IEnumerable<Request> Batch(string from, string to) |
||||
|
{ |
||||
|
return A<IEnumerable<Request>>.That.Matches(x => x.First().Key == from && x.Last().Key == to); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue