mirror of https://github.com/Squidex/squidex.git
18 changed files with 557 additions and 208 deletions
@ -0,0 +1,33 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Collections.Generic; |
|||
using MongoDB.Bson; |
|||
using MongoDB.Bson.Serialization.Attributes; |
|||
using NodaTime; |
|||
|
|||
namespace Squidex.Infrastructure.Log |
|||
{ |
|||
public sealed class MongoRequest |
|||
{ |
|||
[BsonId] |
|||
[BsonElement] |
|||
public ObjectId Id { get; set; } |
|||
|
|||
[BsonElement] |
|||
[BsonRequired] |
|||
public string Key { get; set; } |
|||
|
|||
[BsonElement] |
|||
[BsonRequired] |
|||
public Instant Timestamp { get; set; } |
|||
|
|||
[BsonElement] |
|||
[BsonRequired] |
|||
public Dictionary<string, string> Properties { get; set; } |
|||
} |
|||
} |
|||
@ -0,0 +1,77 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using MongoDB.Driver; |
|||
using NodaTime; |
|||
using Squidex.Infrastructure.Log.Store; |
|||
using Squidex.Infrastructure.MongoDb; |
|||
|
|||
namespace Squidex.Infrastructure.Log |
|||
{ |
|||
public sealed class MongoRequestLogRepository : MongoRepositoryBase<MongoRequest>, IRequestLogRepository |
|||
{ |
|||
private static readonly InsertManyOptions Unordered = new InsertManyOptions { IsOrdered = false }; |
|||
|
|||
public MongoRequestLogRepository(IMongoDatabase database) |
|||
: base(database) |
|||
{ |
|||
} |
|||
|
|||
protected override string CollectionName() |
|||
{ |
|||
return "RequestLog"; |
|||
} |
|||
|
|||
protected override Task SetupCollectionAsync(IMongoCollection<MongoRequest> collection, CancellationToken ct = default) |
|||
{ |
|||
return collection.Indexes.CreateManyAsync(new[] |
|||
{ |
|||
new CreateIndexModel<MongoRequest>( |
|||
Index |
|||
.Ascending(x => x.Key) |
|||
.Ascending(x => x.Timestamp)), |
|||
new CreateIndexModel<MongoRequest>( |
|||
Index |
|||
.Ascending(x => x.Timestamp), |
|||
new CreateIndexOptions |
|||
{ |
|||
ExpireAfter = TimeSpan.FromDays(90) |
|||
}), |
|||
}, ct); |
|||
} |
|||
|
|||
public Task InsertManyAsync(IEnumerable<Request> items) |
|||
{ |
|||
Guard.NotNull(items); |
|||
|
|||
var documents = items.Select(x => new MongoRequest { Key = x.Key, Timestamp = x.Timestamp, Properties = x.Properties }); |
|||
|
|||
return Collection.InsertManyAsync(documents, Unordered); |
|||
} |
|||
|
|||
public Task QueryAllAsync(Func<Request, Task> callback, string key, DateTime fromDate, DateTime toDate, CancellationToken ct = default) |
|||
{ |
|||
Guard.NotNull(callback); |
|||
Guard.NotNullOrEmpty(key); |
|||
|
|||
var timestampStart = Instant.FromDateTimeUtc(fromDate); |
|||
var timestampEnd = Instant.FromDateTimeUtc(toDate.AddDays(1)); |
|||
|
|||
return Collection.Find(x => x.Key == key && x.Timestamp >= timestampStart && x.Timestamp < timestampEnd).ForEachAsync(x => |
|||
{ |
|||
var request = new Request { Key = x.Key, Timestamp = x.Timestamp, Properties = x.Properties }; |
|||
|
|||
return callback(request); |
|||
}, ct); |
|||
} |
|||
} |
|||
} |
|||
@ -1,83 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.IO; |
|||
using System.Text; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Orleans; |
|||
using Squidex.Infrastructure.Orleans; |
|||
|
|||
namespace Squidex.Infrastructure.Log |
|||
{ |
|||
public sealed class LockingLogStore : ILogStore |
|||
{ |
|||
private static readonly byte[] LockedText = Encoding.UTF8.GetBytes("Another process is currenty running, try it again later."); |
|||
private static readonly TimeSpan LockWaitingTime = TimeSpan.FromMinutes(1); |
|||
private readonly ILogStore inner; |
|||
private readonly ILockGrain lockGrain; |
|||
|
|||
public LockingLogStore(ILogStore inner, IGrainFactory grainFactory) |
|||
{ |
|||
Guard.NotNull(inner); |
|||
Guard.NotNull(grainFactory); |
|||
|
|||
this.inner = inner; |
|||
|
|||
lockGrain = grainFactory.GetGrain<ILockGrain>(SingleGrain.Id); |
|||
} |
|||
|
|||
public Task ReadLogAsync(string key, DateTime from, DateTime to, Stream stream) |
|||
{ |
|||
return ReadLogAsync(key, from, to, stream, LockWaitingTime); |
|||
} |
|||
|
|||
public async Task ReadLogAsync(string key, DateTime from, DateTime to, Stream stream, TimeSpan lockTimeout) |
|||
{ |
|||
using (var cts = new CancellationTokenSource(lockTimeout)) |
|||
{ |
|||
string? releaseToken = null; |
|||
|
|||
while (!cts.IsCancellationRequested) |
|||
{ |
|||
releaseToken = await lockGrain.AcquireLockAsync(key); |
|||
|
|||
if (releaseToken != null) |
|||
{ |
|||
break; |
|||
} |
|||
|
|||
try |
|||
{ |
|||
await Task.Delay(2000, cts.Token); |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
break; |
|||
} |
|||
} |
|||
|
|||
if (releaseToken != null) |
|||
{ |
|||
try |
|||
{ |
|||
await inner.ReadLogAsync(key, from, to, stream); |
|||
} |
|||
finally |
|||
{ |
|||
await lockGrain.ReleaseLockAsync(releaseToken); |
|||
} |
|||
} |
|||
else |
|||
{ |
|||
await stream.WriteAsync(LockedText, 0, LockedText.Length); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,91 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Squidex.Infrastructure.Tasks; |
|||
using Squidex.Infrastructure.Timers; |
|||
|
|||
namespace Squidex.Infrastructure.Log.Store |
|||
{ |
|||
public sealed class BackgroundRequestLogStore : DisposableObjectBase, IRequestLogStore |
|||
{ |
|||
private const int Intervall = 10 * 1000; |
|||
private const int BatchSize = 1000; |
|||
private readonly IRequestLogRepository logRepository; |
|||
private readonly ISemanticLog log; |
|||
private readonly CompletionTimer timer; |
|||
private ConcurrentQueue<Request> jobs = new ConcurrentQueue<Request>(); |
|||
|
|||
public BackgroundRequestLogStore(IRequestLogRepository logRepository, ISemanticLog log) |
|||
{ |
|||
Guard.NotNull(logRepository); |
|||
Guard.NotNull(log); |
|||
|
|||
this.logRepository = logRepository; |
|||
this.log = log; |
|||
|
|||
timer = new CompletionTimer(Intervall, ct => TrackAsync(), Intervall); |
|||
} |
|||
|
|||
protected override void DisposeObject(bool disposing) |
|||
{ |
|||
if (disposing) |
|||
{ |
|||
timer.StopAsync().Wait(); |
|||
} |
|||
} |
|||
|
|||
public void Next() |
|||
{ |
|||
ThrowIfDisposed(); |
|||
|
|||
timer.SkipCurrentDelay(); |
|||
} |
|||
|
|||
private async Task TrackAsync() |
|||
{ |
|||
try |
|||
{ |
|||
var localJobs = Interlocked.Exchange(ref jobs, new ConcurrentQueue<Request>()); |
|||
|
|||
if (localJobs.Count > 0) |
|||
{ |
|||
var pages = (int)Math.Ceiling((double)localJobs.Count / BatchSize); |
|||
|
|||
for (var i = 0; i < pages; i++) |
|||
{ |
|||
await logRepository.InsertManyAsync(localJobs.Skip(i * BatchSize).Take(BatchSize)); |
|||
} |
|||
} |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, w => w |
|||
.WriteProperty("action", "TrackUsage") |
|||
.WriteProperty("status", "Failed")); |
|||
} |
|||
} |
|||
|
|||
public Task QueryAllAsync(Func<Request, Task> callback, string key, DateTime fromDate, DateTime toDate, CancellationToken ct = default) |
|||
{ |
|||
return logRepository.QueryAllAsync(callback, key, fromDate, toDate, ct); |
|||
} |
|||
|
|||
public Task LogAsync(Request request) |
|||
{ |
|||
Guard.NotNull(request); |
|||
|
|||
jobs.Enqueue(request); |
|||
|
|||
return TaskHelper.Done; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Collections.Generic; |
|||
using NodaTime; |
|||
|
|||
#pragma warning disable SA1401 // Fields should be private
|
|||
|
|||
namespace Squidex.Infrastructure.Log.Store |
|||
{ |
|||
public sealed class Request |
|||
{ |
|||
public Instant Timestamp; |
|||
|
|||
public string Key; |
|||
|
|||
public Dictionary<string, string> Properties; |
|||
} |
|||
} |
|||
@ -0,0 +1,98 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.IO; |
|||
using System.Threading.Tasks; |
|||
using FakeItEasy; |
|||
using Squidex.Infrastructure.Log.Store; |
|||
using Xunit; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Apps |
|||
{ |
|||
public class DefaultAppLogStoreTests |
|||
{ |
|||
private readonly IRequestLogStore requestLogStore = A.Fake<IRequestLogStore>(); |
|||
private readonly DefaultAppLogStore sut; |
|||
|
|||
public DefaultAppLogStoreTests() |
|||
{ |
|||
sut = new DefaultAppLogStore(requestLogStore); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_forward_request_log_to_store() |
|||
{ |
|||
Request? recordedRequest = null; |
|||
|
|||
A.CallTo(() => requestLogStore.LogAsync(A<Request>.Ignored)) |
|||
.Invokes((Request request) => recordedRequest = request); |
|||
|
|||
var clientId = "frontend"; |
|||
var costs = 2; |
|||
var elapsedMs = 120; |
|||
var requestMethod = "GET"; |
|||
var requestPath = "/my-path"; |
|||
var userId = "user1"; |
|||
|
|||
await sut.LogAsync(Guid.NewGuid(), default, requestMethod, requestPath, userId, clientId, elapsedMs, costs); |
|||
|
|||
Assert.NotNull(recordedRequest); |
|||
|
|||
Assert.Contains(clientId, recordedRequest!.Properties.Values); |
|||
Assert.Contains(costs.ToString(), recordedRequest!.Properties.Values); |
|||
Assert.Contains(elapsedMs.ToString(), recordedRequest!.Properties.Values); |
|||
Assert.Contains(requestMethod, recordedRequest!.Properties.Values); |
|||
Assert.Contains(requestPath, recordedRequest!.Properties.Values); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_create_some_stream() |
|||
{ |
|||
var dateFrom = DateTime.UtcNow.Date.AddDays(-30); |
|||
var dateTo = DateTime.UtcNow.Date; |
|||
|
|||
var appId = Guid.NewGuid(); |
|||
|
|||
A.CallTo(() => requestLogStore.QueryAllAsync(A<Func<Request, Task>>.Ignored, appId.ToString(), dateFrom, dateTo, default)) |
|||
.Invokes(x => |
|||
{ |
|||
var callback = x.GetArgument<Func<Request, Task>>(0); |
|||
|
|||
callback(CreateRecord()); |
|||
callback(CreateRecord()); |
|||
callback(CreateRecord()); |
|||
callback(CreateRecord()); |
|||
}); |
|||
|
|||
var stream = new MemoryStream(); |
|||
|
|||
await sut.ReadLogAsync(appId, dateFrom, dateTo, stream); |
|||
|
|||
stream.Position = 0; |
|||
|
|||
var lines = 0; |
|||
|
|||
using (var reader = new StreamReader(stream)) |
|||
{ |
|||
string? line = null; |
|||
while ((line = reader.ReadLine()) != null) |
|||
{ |
|||
lines++; |
|||
} |
|||
} |
|||
|
|||
Assert.Equal(5, lines); |
|||
} |
|||
|
|||
private static Request CreateRecord() |
|||
{ |
|||
return new Request { Properties = new Dictionary<string, string>() }; |
|||
} |
|||
} |
|||
} |
|||
@ -1,87 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.IO; |
|||
using System.Threading.Tasks; |
|||
using FakeItEasy; |
|||
using Orleans; |
|||
using Squidex.Infrastructure.Orleans; |
|||
using Xunit; |
|||
|
|||
namespace Squidex.Infrastructure.Log |
|||
{ |
|||
public class LockingLogStoreTests |
|||
{ |
|||
private readonly IGrainFactory grainFactory = A.Fake<IGrainFactory>(); |
|||
private readonly ILockGrain lockGrain = A.Fake<ILockGrain>(); |
|||
private readonly ILogStore inner = A.Fake<ILogStore>(); |
|||
private readonly LockingLogStore sut; |
|||
|
|||
public LockingLogStoreTests() |
|||
{ |
|||
A.CallTo(() => grainFactory.GetGrain<ILockGrain>(SingleGrain.Id, null)) |
|||
.Returns(lockGrain); |
|||
|
|||
sut = new LockingLogStore(inner, grainFactory); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_lock_and_call_inner() |
|||
{ |
|||
var stream = new MemoryStream(); |
|||
|
|||
var dateFrom = DateTime.Today; |
|||
var dateTo = dateFrom.AddDays(2); |
|||
|
|||
var key = "MyKey"; |
|||
|
|||
var releaseToken = Guid.NewGuid().ToString(); |
|||
|
|||
A.CallTo(() => lockGrain.AcquireLockAsync(key)) |
|||
.Returns(releaseToken); |
|||
|
|||
await sut.ReadLogAsync(key, dateFrom, dateTo, stream); |
|||
|
|||
A.CallTo(() => lockGrain.AcquireLockAsync(key)) |
|||
.MustHaveHappened(); |
|||
|
|||
A.CallTo(() => lockGrain.ReleaseLockAsync(releaseToken)) |
|||
.MustHaveHappened(); |
|||
|
|||
A.CallTo(() => inner.ReadLogAsync(key, dateFrom, dateTo, stream)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_write_default_message_if_lock_could_not_be_acquired() |
|||
{ |
|||
var stream = new MemoryStream(); |
|||
|
|||
var dateFrom = DateTime.Today; |
|||
var dateTo = dateFrom.AddDays(2); |
|||
|
|||
var key = "MyKey"; |
|||
|
|||
A.CallTo(() => lockGrain.AcquireLockAsync(key)) |
|||
.Returns(Task.FromResult<string?>(null)); |
|||
|
|||
await sut.ReadLogAsync(key, dateFrom, dateTo, stream, TimeSpan.FromSeconds(1)); |
|||
|
|||
A.CallTo(() => lockGrain.AcquireLockAsync(key)) |
|||
.MustHaveHappened(); |
|||
|
|||
A.CallTo(() => lockGrain.ReleaseLockAsync(A<string>.Ignored)) |
|||
.MustNotHaveHappened(); |
|||
|
|||
A.CallTo(() => inner.ReadLogAsync(A<string>.Ignored, A<DateTime>.Ignored, A<DateTime>.Ignored, A<Stream>.Ignored)) |
|||
.MustNotHaveHappened(); |
|||
|
|||
Assert.True(stream.Length > 0); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,52 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using FakeItEasy; |
|||
using Xunit; |
|||
|
|||
namespace Squidex.Infrastructure.Log.Store |
|||
{ |
|||
public class BackgroundRequestLogStoreTests |
|||
{ |
|||
private readonly IRequestLogRepository requestLogRepository = A.Fake<IRequestLogRepository>(); |
|||
private readonly BackgroundRequestLogStore sut; |
|||
|
|||
public BackgroundRequestLogStoreTests() |
|||
{ |
|||
sut = new BackgroundRequestLogStore(requestLogRepository, A.Fake<ISemanticLog>()); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_log_in_batches() |
|||
{ |
|||
for (var i = 0; i < 2500; i++) |
|||
{ |
|||
await sut.LogAsync(new Request { Key = i.ToString() }); |
|||
} |
|||
|
|||
sut.Next(); |
|||
sut.Dispose(); |
|||
|
|||
A.CallTo(() => requestLogRepository.InsertManyAsync(Batch("0", "999"))) |
|||
.MustHaveHappened(); |
|||
|
|||
A.CallTo(() => requestLogRepository.InsertManyAsync(Batch("1000", "1999"))) |
|||
.MustHaveHappened(); |
|||
|
|||
A.CallTo(() => requestLogRepository.InsertManyAsync(Batch("2000", "2499"))) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
private static IEnumerable<Request> Batch(string from, string to) |
|||
{ |
|||
return A<IEnumerable<Request>>.That.Matches(x => x.First().Key == from && x.Last().Key == to); |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue