diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs index c7b8e58e6..1341f48c9 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs @@ -15,6 +15,7 @@ using Squidex.Domain.Apps.Entities.Assets.Edm; using Squidex.Domain.Apps.Entities.Assets.Repositories; using Squidex.Domain.Apps.Entities.MongoDb.Assets.Visitors; using Squidex.Infrastructure; +using Squidex.Infrastructure.Log; using Squidex.Infrastructure.MongoDb; namespace Squidex.Domain.Apps.Entities.MongoDb.Assets @@ -43,64 +44,73 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets public async Task> QueryAsync(Guid appId, string query = null) { - try + using (Profiler.TraceMethod("QueryAsyncByQuery")) { - var odataQuery = EdmAssetModel.Edm.ParseQuery(query); + try + { + var odataQuery = EdmAssetModel.Edm.ParseQuery(query); - var filter = FindExtensions.BuildQuery(odataQuery, appId); + var filter = FindExtensions.BuildQuery(odataQuery, appId); - var contentCount = Collection.Find(filter).CountAsync(); - var contentItems = - Collection.Find(filter) - .AssetTake(odataQuery) - .AssetSkip(odataQuery) - .AssetSort(odataQuery) - .ToListAsync(); + var contentCount = Collection.Find(filter).CountAsync(); + var contentItems = + Collection.Find(filter) + .AssetTake(odataQuery) + .AssetSkip(odataQuery) + .AssetSort(odataQuery) + .ToListAsync(); - await Task.WhenAll(contentItems, contentCount); + await Task.WhenAll(contentItems, contentCount); - return ResultList.Create(contentItems.Result, contentCount.Result); - } - catch (NotSupportedException) - { - throw new ValidationException("This odata operation is not supported."); - } - catch (NotImplementedException) - { - throw new ValidationException("This odata operation is not supported."); - } - catch (MongoQueryException ex) - { - if (ex.Message.Contains("17406")) + return ResultList.Create(contentItems.Result, contentCount.Result); + } + catch (NotSupportedException) + { + throw new ValidationException("This odata operation is not supported."); + } + catch (NotImplementedException) { - throw new DomainException("Result set is too large to be retrieved. Use $top parameter to reduce the number of items."); + throw new ValidationException("This odata operation is not supported."); } - else + catch (MongoQueryException ex) { - throw; + if (ex.Message.Contains("17406")) + { + throw new DomainException("Result set is too large to be retrieved. Use $top parameter to reduce the number of items."); + } + else + { + throw; + } } } } public async Task> QueryAsync(Guid appId, HashSet ids) { - var find = Collection.Find(x => ids.Contains(x.Id)).SortByDescending(x => x.LastModified); + using (Profiler.TraceMethod("QueryAsyncByIds")) + { + var find = Collection.Find(x => ids.Contains(x.Id)).SortByDescending(x => x.LastModified); - var assetItems = find.ToListAsync(); - var assetCount = find.CountAsync(); + var assetItems = find.ToListAsync(); + var assetCount = find.CountAsync(); - await Task.WhenAll(assetItems, assetCount); + await Task.WhenAll(assetItems, assetCount); - return ResultList.Create(assetItems.Result.OfType().ToList(), assetCount.Result); + return ResultList.Create(assetItems.Result.OfType().ToList(), assetCount.Result); + } } public async Task FindAssetAsync(Guid id) { - var assetEntity = - await Collection.Find(x => x.Id == id) - .FirstOrDefaultAsync(); + using (Profiler.TraceMethod()) + { + var assetEntity = + await Collection.Find(x => x.Id == id) + .FirstOrDefaultAsync(); - return assetEntity; + return assetEntity; + } } } } diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs index 49492c3bf..6cdbfecc6 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs @@ -10,6 +10,7 @@ using System.Threading.Tasks; using MongoDB.Driver; using Squidex.Domain.Apps.Entities.Assets.State; using Squidex.Infrastructure; +using Squidex.Infrastructure.Log; using Squidex.Infrastructure.Reflection; using Squidex.Infrastructure.States; @@ -24,26 +25,32 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets public async Task<(AssetState Value, long Version)> ReadAsync(Guid key) { - var existing = - await Collection.Find(x => x.Id == key) - .FirstOrDefaultAsync(); - - if (existing != null) + using (Profiler.TraceMethod()) { - return (SimpleMapper.Map(existing, new AssetState()), existing.Version); - } + var existing = + await Collection.Find(x => x.Id == key) + .FirstOrDefaultAsync(); + + if (existing != null) + { + return (SimpleMapper.Map(existing, new AssetState()), existing.Version); + } - return (null, EtagVersion.NotFound); + return (null, EtagVersion.NotFound); + } } public async Task WriteAsync(Guid key, AssetState value, long oldVersion, long newVersion) { - var entity = SimpleMapper.Map(value, new MongoAssetEntity()); + using (Profiler.TraceMethod()) + { + var entity = SimpleMapper.Map(value, new MongoAssetEntity()); - entity.Version = newVersion; - entity.IndexedAppId = value.AppId.Id; + entity.Version = newVersion; + entity.IndexedAppId = value.AppId.Id; - await Collection.ReplaceOneAsync(x => x.Id == key && x.Version == oldVersion, entity, Upsert); + await Collection.ReplaceOneAsync(x => x.Id == key && x.Version == oldVersion, entity, Upsert); + } } } } diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs index 0230002fd..0654b12d8 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs @@ -17,6 +17,7 @@ using Squidex.Domain.Apps.Entities.Contents; using Squidex.Domain.Apps.Entities.Contents.Repositories; using Squidex.Domain.Apps.Entities.Schemas; using Squidex.Infrastructure; +using Squidex.Infrastructure.Log; namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { @@ -45,50 +46,65 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents contentsPublished.Initialize(); } - public Task> QueryAsync(IAppEntity app, ISchemaEntity schema, Status[] status, ODataUriParser odataQuery) + public async Task> QueryAsync(IAppEntity app, ISchemaEntity schema, Status[] status, ODataUriParser odataQuery) { - if (RequiresPublished(status)) + using (Profiler.TraceMethod("QueryAsyncByQuery")) { - return contentsPublished.QueryAsync(app, schema, odataQuery); - } - else - { - return contentsDraft.QueryAsync(app, schema, odataQuery, status, true); + if (RequiresPublished(status)) + { + return await contentsPublished.QueryAsync(app, schema, odataQuery); + } + else + { + return await contentsDraft.QueryAsync(app, schema, odataQuery, status, true); + } } } - public Task> QueryAsync(IAppEntity app, ISchemaEntity schema, Status[] status, HashSet ids) + public async Task> QueryAsync(IAppEntity app, ISchemaEntity schema, Status[] status, HashSet ids) { - if (RequiresPublished(status)) + using (Profiler.TraceMethod("QueryAsyncByIds")) { - return contentsPublished.QueryAsync(app, schema, ids); - } - else - { - return contentsDraft.QueryAsync(app, schema, ids, status); + if (RequiresPublished(status)) + { + return await contentsPublished.QueryAsync(app, schema, ids); + } + else + { + return await contentsDraft.QueryAsync(app, schema, ids, status); + } } } - public Task FindContentAsync(IAppEntity app, ISchemaEntity schema, Status[] status, Guid id) + public async Task FindContentAsync(IAppEntity app, ISchemaEntity schema, Status[] status, Guid id) { - if (RequiresPublished(status)) - { - return contentsPublished.FindContentAsync(app, schema, id); - } - else + using (Profiler.TraceMethod()) { - return contentsDraft.FindContentAsync(app, schema, id); + if (RequiresPublished(status)) + { + return await contentsPublished.FindContentAsync(app, schema, id); + } + else + { + return await contentsDraft.FindContentAsync(app, schema, id); + } } } - public Task> QueryNotFoundAsync(Guid appId, Guid schemaId, IList ids) + public async Task> QueryNotFoundAsync(Guid appId, Guid schemaId, IList ids) { - return contentsDraft.QueryNotFoundAsync(appId, schemaId, ids); + using (Profiler.TraceMethod()) + { + await contentsDraft.QueryNotFoundAsync(appId, schemaId, ids); + } } - public Task QueryScheduledWithoutDataAsync(Instant now, Func callback) + public async Task QueryScheduledWithoutDataAsync(Instant now, Func callback) { - return contentsDraft.QueryScheduledWithoutDataAsync(now, callback); + using (Profiler.TraceMethod()) + { + await contentsDraft.QueryScheduledWithoutDataAsync(now, callback); + } } public Task ClearAsync() diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs index ddb240e1c..5766a023b 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs @@ -12,6 +12,7 @@ using Squidex.Domain.Apps.Core.ConvertContent; using Squidex.Domain.Apps.Entities.Contents.State; using Squidex.Domain.Apps.Entities.Schemas; using Squidex.Infrastructure; +using Squidex.Infrastructure.Log; using Squidex.Infrastructure.Reflection; using Squidex.Infrastructure.States; @@ -19,43 +20,49 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { public partial class MongoContentRepository : ISnapshotStore { - public Task<(ContentState Value, long Version)> ReadAsync(Guid key) + public async Task<(ContentState Value, long Version)> ReadAsync(Guid key) { - return contentsDraft.ReadAsync(key, GetSchemaAsync); + using (Profiler.TraceMethod()) + { + return await contentsDraft.ReadAsync(key, GetSchemaAsync); + } } public async Task WriteAsync(Guid key, ContentState value, long oldVersion, long newVersion) { - if (value.SchemaId.Id == Guid.Empty) + using (Profiler.TraceMethod()) { - return; - } + if (value.SchemaId.Id == Guid.Empty) + { + return; + } - var schema = await GetSchemaAsync(value.AppId.Id, value.SchemaId.Id); + var schema = await GetSchemaAsync(value.AppId.Id, value.SchemaId.Id); - var idData = value.Data.ToIdModel(schema.SchemaDef, true); + var idData = value.Data.ToIdModel(schema.SchemaDef, true); - var content = SimpleMapper.Map(value, new MongoContentEntity - { - DataByIds = idData, - DataDraftByIds = value.DataDraft?.ToIdModel(schema.SchemaDef, true), - IsDeleted = value.IsDeleted, - IndexedAppId = value.AppId.Id, - IndexedSchemaId = value.SchemaId.Id, - ReferencedIds = idData.ToReferencedIds(schema.SchemaDef), - ScheduledAt = value.ScheduleJob?.DueTime, - Version = newVersion - }); + var content = SimpleMapper.Map(value, new MongoContentEntity + { + DataByIds = idData, + DataDraftByIds = value.DataDraft?.ToIdModel(schema.SchemaDef, true), + IsDeleted = value.IsDeleted, + IndexedAppId = value.AppId.Id, + IndexedSchemaId = value.SchemaId.Id, + ReferencedIds = idData.ToReferencedIds(schema.SchemaDef), + ScheduledAt = value.ScheduleJob?.DueTime, + Version = newVersion + }); - await contentsDraft.UpsertAsync(content, oldVersion); + await contentsDraft.UpsertAsync(content, oldVersion); - if (value.Status == Status.Published && !value.IsDeleted) - { - await contentsPublished.UpsertAsync(content); - } - else - { - await contentsPublished.RemoveAsync(content.Id); + if (value.Status == Status.Published && !value.IsDeleted) + { + await contentsPublished.UpsertAsync(content); + } + else + { + await contentsPublished.RemoveAsync(content.Id); + } } } diff --git a/src/Squidex.Domain.Apps.Entities/Contents/ContentQueryService.cs b/src/Squidex.Domain.Apps.Entities/Contents/ContentQueryService.cs index 4ca7f298e..70f0da81a 100644 --- a/src/Squidex.Domain.Apps.Entities/Contents/ContentQueryService.cs +++ b/src/Squidex.Domain.Apps.Entities/Contents/ContentQueryService.cs @@ -20,6 +20,7 @@ using Squidex.Domain.Apps.Entities.Contents.Edm; using Squidex.Domain.Apps.Entities.Contents.Repositories; using Squidex.Domain.Apps.Entities.Schemas; using Squidex.Infrastructure; +using Squidex.Infrastructure.Log; using Squidex.Infrastructure.Reflection; using Squidex.Infrastructure.Security; @@ -71,22 +72,25 @@ namespace Squidex.Domain.Apps.Entities.Contents var schema = await GetSchemaAsync(app, schemaIdOrName); - var isVersioned = version > EtagVersion.Empty; - var isFrontend = IsFrontendClient(user); + using (Profiler.TraceMethod()) + { + var isVersioned = version > EtagVersion.Empty; + var isFrontend = IsFrontendClient(user); - var parsedStatus = isFrontend ? StatusAll : StatusPublished; + var parsedStatus = isFrontend ? StatusAll : StatusPublished; - var content = - isVersioned ? - await FindContentByVersionAsync(id, version) : - await FindContentAsync(app, id, parsedStatus, schema); + var content = + isVersioned ? + await FindContentByVersionAsync(id, version) : + await FindContentAsync(app, id, parsedStatus, schema); - if (content == null || (content.Status != Status.Published && !isFrontend) || content.SchemaId.Id != schema.Id) - { - throw new DomainObjectNotFoundException(id.ToString(), typeof(ISchemaEntity)); - } + if (content == null || (content.Status != Status.Published && !isFrontend) || content.SchemaId.Id != schema.Id) + { + throw new DomainObjectNotFoundException(id.ToString(), typeof(ISchemaEntity)); + } - return TransformContent(app, schema, user, content, isFrontend, isVersioned); + return TransformContent(app, schema, user, content, isFrontend, isVersioned); + } } public async Task> QueryAsync(IAppEntity app, string schemaIdOrName, ClaimsPrincipal user, bool archived, string query) @@ -97,14 +101,17 @@ namespace Squidex.Domain.Apps.Entities.Contents var schema = await GetSchemaAsync(app, schemaIdOrName); - var isFrontend = IsFrontendClient(user); + using (Profiler.TraceMethod("QueryAsyncByQuery")) + { + var isFrontend = IsFrontendClient(user); - var parsedQuery = ParseQuery(app, query, schema); - var parsedStatus = ParseStatus(isFrontend, archived); + var parsedQuery = ParseQuery(app, query, schema); + var parsedStatus = ParseStatus(isFrontend, archived); - var contents = await contentRepository.QueryAsync(app, schema, parsedStatus, parsedQuery); + var contents = await contentRepository.QueryAsync(app, schema, parsedStatus, parsedQuery); - return TransformContents(app, schema, user, contents, false, isFrontend); + return TransformContents(app, schema, user, contents, false, isFrontend); + } } public async Task> QueryAsync(IAppEntity app, string schemaIdOrName, ClaimsPrincipal user, bool archived, HashSet ids) @@ -116,13 +123,16 @@ namespace Squidex.Domain.Apps.Entities.Contents var schema = await GetSchemaAsync(app, schemaIdOrName); - var isFrontend = IsFrontendClient(user); + using (Profiler.TraceMethod("QueryAsyncByIds")) + { + var isFrontend = IsFrontendClient(user); - var parsedStatus = ParseStatus(isFrontend, archived); + var parsedStatus = ParseStatus(isFrontend, archived); - var contents = await contentRepository.QueryAsync(app, schema, parsedStatus, ids); + var contents = await contentRepository.QueryAsync(app, schema, parsedStatus, ids); - return TransformContents(app, schema, user, contents, false, isFrontend); + return TransformContents(app, schema, user, contents, false, isFrontend); + } } private IContentEntity TransformContent(IAppEntity app, ISchemaEntity schema, ClaimsPrincipal user, @@ -148,44 +158,50 @@ namespace Squidex.Domain.Apps.Entities.Contents bool isTypeChecking, bool isFrontendClient) { - var scriptText = schema.ScriptQuery; - - var isScripting = !string.IsNullOrWhiteSpace(scriptText); - - foreach (var content in contents) + using (Profiler.TraceMethod()) { - var result = SimpleMapper.Map(content, new ContentEntity()); + var scriptText = schema.ScriptQuery; + + var isScripting = !string.IsNullOrWhiteSpace(scriptText); - if (result.Data != null) + foreach (var content in contents) { - if (!isFrontendClient && isScripting) + var result = SimpleMapper.Map(content, new ContentEntity()); + + if (result.Data != null) { - result.Data = scriptEngine.Transform(new ScriptContext { User = user, Data = content.Data, ContentId = content.Id }, scriptText); + if (!isFrontendClient && isScripting) + { + result.Data = scriptEngine.Transform(new ScriptContext { User = user, Data = content.Data, ContentId = content.Id }, scriptText); + } + + result.Data = result.Data.ToApiModel(schema.SchemaDef, app.LanguagesConfig, isFrontendClient, isTypeChecking); } - result.Data = result.Data.ToApiModel(schema.SchemaDef, app.LanguagesConfig, isFrontendClient, isTypeChecking); - } + if (result.DataDraft != null) + { + result.DataDraft = result.DataDraft.ToApiModel(schema.SchemaDef, app.LanguagesConfig, isFrontendClient, isTypeChecking); + } - if (result.DataDraft != null) - { - result.DataDraft = result.DataDraft.ToApiModel(schema.SchemaDef, app.LanguagesConfig, isFrontendClient, isTypeChecking); + yield return result; } - - yield return result; } } private ODataUriParser ParseQuery(IAppEntity app, string query, ISchemaEntity schema) { - try + using (Profiler.TraceMethod()) { - var model = modelBuilder.BuildEdmModel(schema, app); + try + { + var model = modelBuilder.BuildEdmModel(schema, app); - return model.ParseQuery(query); - } - catch (ODataException ex) - { - throw new ValidationException($"Failed to parse query: {ex.Message}", ex); + return model.ParseQuery(query); + } + catch (ODataException ex) + { + throw new ValidationException($"Failed to parse query: {ex.Message}", ex); + } } } diff --git a/src/Squidex.Domain.Apps.Entities/Contents/ContentVersionLoader.cs b/src/Squidex.Domain.Apps.Entities/Contents/ContentVersionLoader.cs index d7c0dd7f1..dca35ba34 100644 --- a/src/Squidex.Domain.Apps.Entities/Contents/ContentVersionLoader.cs +++ b/src/Squidex.Domain.Apps.Entities/Contents/ContentVersionLoader.cs @@ -10,6 +10,7 @@ using System.Threading.Tasks; using Squidex.Domain.Apps.Core.Schemas; using Squidex.Domain.Apps.Entities.Contents.State; using Squidex.Infrastructure; +using Squidex.Infrastructure.Log; using Squidex.Infrastructure.States; namespace Squidex.Domain.Apps.Entities.Contents @@ -31,25 +32,28 @@ namespace Squidex.Domain.Apps.Entities.Contents public async Task LoadAsync(Guid id, long version) { - var content = new ContentState(); - - var persistence = store.WithEventSourcing(id, e => + using (Profiler.TraceMethod()) { - if (content.Version < version) + var content = new ContentState(); + + var persistence = store.WithEventSourcing(id, e => { - content = content.Apply(e); - content.Version++; - } - }); + if (content.Version < version) + { + content = content.Apply(e); + content.Version++; + } + }); - await persistence.ReadAsync(); + await persistence.ReadAsync(); - if (content.Version != version) - { - throw new DomainObjectNotFoundException(id.ToString(), typeof(IContentEntity)); - } + if (content.Version != version) + { + throw new DomainObjectNotFoundException(id.ToString(), typeof(IContentEntity)); + } - return content; + return content; + } } } } diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs index 9e4d1a4d8..d21394747 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs @@ -11,6 +11,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using EventStore.ClientAPI; +using Squidex.Infrastructure.Log; namespace Squidex.Infrastructure.EventSourcing { @@ -59,20 +60,26 @@ namespace Squidex.Infrastructure.EventSourcing public async Task QueryAsync(Func callback, string property, object value, string position = null, CancellationToken ct = default(CancellationToken)) { - var streamName = await projectionClient.CreateProjectionAsync(property, value); + using (Profiler.TraceMethod()) + { + var streamName = await projectionClient.CreateProjectionAsync(property, value); - var sliceStart = projectionClient.ParsePosition(position); + var sliceStart = projectionClient.ParsePosition(position); - await QueryAsync(callback, streamName, sliceStart, ct); + await QueryAsync(callback, streamName, sliceStart, ct); + } } public async Task QueryAsync(Func callback, string streamFilter = null, string position = null, CancellationToken ct = default(CancellationToken)) { - var streamName = await projectionClient.CreateProjectionAsync(streamFilter); + using (Profiler.TraceMethod()) + { + var streamName = await projectionClient.CreateProjectionAsync(streamFilter); - var sliceStart = projectionClient.ParsePosition(position); + var sliceStart = projectionClient.ParsePosition(position); - await QueryAsync(callback, streamName, sliceStart, ct); + await QueryAsync(callback, streamName, sliceStart, ct); + } } private Task QueryAsync(Func callback, string streamName, long sliceStart, CancellationToken ct) @@ -82,30 +89,33 @@ namespace Squidex.Infrastructure.EventSourcing public async Task> QueryAsync(string streamName, long streamPosition = 0) { - var result = new List(); - - var sliceStart = streamPosition; - - StreamEventsSlice currentSlice; - do + using (Profiler.TraceMethod()) { - currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, false); + var result = new List(); - if (currentSlice.Status == SliceReadStatus.Success) + var sliceStart = streamPosition; + + StreamEventsSlice currentSlice; + do { - sliceStart = currentSlice.NextEventNumber; + currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, false); - foreach (var resolved in currentSlice.Events) + if (currentSlice.Status == SliceReadStatus.Success) { - var storedEvent = Formatter.Read(resolved); + sliceStart = currentSlice.NextEventNumber; + + foreach (var resolved in currentSlice.Events) + { + var storedEvent = Formatter.Read(resolved); - result.Add(storedEvent); + result.Add(storedEvent); + } } } - } - while (!currentSlice.IsEndOfStream); + while (!currentSlice.IsEndOfStream); - return result; + return result; + } } public Task AppendAsync(Guid commitId, string streamName, ICollection events) @@ -122,30 +132,33 @@ namespace Squidex.Infrastructure.EventSourcing private async Task AppendEventsInternalAsync(string streamName, long expectedVersion, ICollection events) { - Guard.NotNullOrEmpty(streamName, nameof(streamName)); - Guard.NotNull(events, nameof(events)); - - if (events.Count == 0) + using (Profiler.TraceMethod(nameof(AppendAsync))) { - return; - } + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + Guard.NotNull(events, nameof(events)); - var eventsToSave = events.Select(Formatter.Write).ToList(); + if (events.Count == 0) + { + return; + } - if (eventsToSave.Count < WritePageSize) - { - await connection.AppendToStreamAsync(GetStreamName(streamName), expectedVersion, eventsToSave); - } - else - { - using (var transaction = await connection.StartTransactionAsync(GetStreamName(streamName), expectedVersion)) + var eventsToSave = events.Select(Formatter.Write).ToList(); + + if (eventsToSave.Count < WritePageSize) + { + await connection.AppendToStreamAsync(GetStreamName(streamName), expectedVersion, eventsToSave); + } + else { - for (var p = 0; p < eventsToSave.Count; p += WritePageSize) + using (var transaction = await connection.StartTransactionAsync(GetStreamName(streamName), expectedVersion)) { - await transaction.WriteAsync(eventsToSave.Skip(p).Take(WritePageSize)); - } + for (var p = 0; p < eventsToSave.Count; p += WritePageSize) + { + await transaction.WriteAsync(eventsToSave.Skip(p).Take(WritePageSize)); + } - await transaction.CommitAsync(); + await transaction.CommitAsync(); + } } } } diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs index d81be0421..649fee930 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -10,6 +10,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver; +using Squidex.Infrastructure.Log; using Squidex.Infrastructure.MongoDb; namespace Squidex.Infrastructure.EventSourcing @@ -31,37 +32,40 @@ namespace Squidex.Infrastructure.EventSourcing public async Task> QueryAsync(string streamName, long streamPosition = 0) { - var commits = - await Collection.Find( - Filter.And( - Filter.Eq(EventStreamField, streamName), - Filter.Gte(EventStreamOffsetField, streamPosition - 1))) - .Sort(Sort.Ascending(TimestampField)).ToListAsync(); - - var result = new List(); - - foreach (var commit in commits) + using (Profiler.TraceMethod()) { - var eventStreamOffset = (int)commit.EventStreamOffset; + var commits = + await Collection.Find( + Filter.And( + Filter.Eq(EventStreamField, streamName), + Filter.Gte(EventStreamOffsetField, streamPosition - 1))) + .Sort(Sort.Ascending(TimestampField)).ToListAsync(); - var commitTimestamp = commit.Timestamp; - var commitOffset = 0; + var result = new List(); - foreach (var e in commit.Events) + foreach (var commit in commits) { - eventStreamOffset++; + var eventStreamOffset = (int)commit.EventStreamOffset; + + var commitTimestamp = commit.Timestamp; + var commitOffset = 0; - if (eventStreamOffset >= streamPosition) + foreach (var e in commit.Events) { - var eventData = e.ToEventData(); - var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); + eventStreamOffset++; - result.Add(new StoredEvent(eventToken, eventStreamOffset, eventData)); + if (eventStreamOffset >= streamPosition) + { + var eventData = e.ToEventData(); + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); + + result.Add(new StoredEvent(eventToken, eventStreamOffset, eventData)); + } } } - } - return result; + return result; + } } public Task QueryAsync(Func callback, string property, object value, string position = null, CancellationToken ct = default(CancellationToken)) @@ -86,30 +90,33 @@ namespace Squidex.Infrastructure.EventSourcing return QueryAsync(callback, lastPosition, filter, ct); } - private Task QueryAsync(Func callback, StreamPosition lastPosition, FilterDefinition filter, CancellationToken ct) + private async Task QueryAsync(Func callback, StreamPosition lastPosition, FilterDefinition filter, CancellationToken ct) { - return Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit => + using (Profiler.TraceMethod()) { - var eventStreamOffset = (int)commit.EventStreamOffset; - - var commitTimestamp = commit.Timestamp; - var commitOffset = 0; - - foreach (var e in commit.Events) + await Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit => { - eventStreamOffset++; + var eventStreamOffset = (int)commit.EventStreamOffset; + + var commitTimestamp = commit.Timestamp; + var commitOffset = 0; - if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) + foreach (var e in commit.Events) { - var eventData = e.ToEventData(); - var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); + eventStreamOffset++; - await callback(new StoredEvent(eventToken, eventStreamOffset, eventData)); + if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) + { + var eventData = e.ToEventData(); + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); - commitOffset++; + await callback(new StoredEvent(eventToken, eventStreamOffset, eventData)); + + commitOffset++; + } } - } - }, ct); + }, ct); + } } private static FilterDefinition CreateFilter(string property, object value, StreamPosition streamPosition) diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs index 937b66050..bbeb5c469 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs @@ -11,6 +11,7 @@ using System.Reactive.Linq; using System.Threading.Tasks; using MongoDB.Bson; using MongoDB.Driver; +using Squidex.Infrastructure.Log; namespace Squidex.Infrastructure.EventSourcing { @@ -26,58 +27,61 @@ namespace Squidex.Infrastructure.EventSourcing public async Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) { - Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); - Guard.NotNullOrEmpty(streamName, nameof(streamName)); - Guard.NotNull(events, nameof(events)); - - if (events.Count == 0) + using (Profiler.TraceMethod()) { - return; - } - - var currentVersion = await GetEventStreamOffset(streamName); + Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + Guard.NotNull(events, nameof(events)); - if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion) - { - throw new WrongEventVersionException(currentVersion, expectedVersion); - } + if (events.Count == 0) + { + return; + } - var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); + var currentVersion = await GetEventStreamOffset(streamName); - for (var attempt = 0; attempt < MaxWriteAttempts; attempt++) - { - try + if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion) { - await Collection.InsertOneAsync(commit); + throw new WrongEventVersionException(currentVersion, expectedVersion); + } - notifier.NotifyEventsStored(streamName); + var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); - return; - } - catch (MongoWriteException ex) + for (var attempt = 0; attempt < MaxWriteAttempts; attempt++) { - if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) + try { - currentVersion = await GetEventStreamOffset(streamName); + await Collection.InsertOneAsync(commit); - if (expectedVersion != EtagVersion.Any) - { - throw new WrongEventVersionException(currentVersion, expectedVersion); - } + notifier.NotifyEventsStored(streamName); - if (attempt < MaxWriteAttempts) + return; + } + catch (MongoWriteException ex) + { + if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) { - expectedVersion = currentVersion; + currentVersion = await GetEventStreamOffset(streamName); + + if (expectedVersion != EtagVersion.Any) + { + throw new WrongEventVersionException(currentVersion, expectedVersion); + } + + if (attempt < MaxWriteAttempts) + { + expectedVersion = currentVersion; + } + else + { + throw new TimeoutException("Could not acquire a free slot for the commit within the provided time."); + } } else { - throw new TimeoutException("Could not acquire a free slot for the commit within the provided time."); + throw; } } - else - { - throw; - } } } } diff --git a/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs b/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs index 873f49a1c..960c7aeb8 100644 --- a/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs @@ -10,6 +10,7 @@ using System.Threading.Tasks; using MongoDB.Bson; using MongoDB.Driver; using Newtonsoft.Json; +using Squidex.Infrastructure.Log; using Squidex.Infrastructure.MongoDb; namespace Squidex.Infrastructure.States @@ -37,26 +38,35 @@ namespace Squidex.Infrastructure.States public async Task<(T Value, long Version)> ReadAsync(TKey key) { - var existing = - await Collection.Find(x => x.Id.Equals(key)) - .FirstOrDefaultAsync(); - - if (existing != null) + using (Profiler.TraceMethod>()) { - return (existing.Doc, existing.Version); - } + var existing = + await Collection.Find(x => x.Id.Equals(key)) + .FirstOrDefaultAsync(); - return (default(T), EtagVersion.NotFound); + if (existing != null) + { + return (existing.Doc, existing.Version); + } + + return (default(T), EtagVersion.NotFound); + } } - public Task WriteAsync(TKey key, T value, long oldVersion, long newVersion) + public async Task WriteAsync(TKey key, T value, long oldVersion, long newVersion) { - return Collection.UpsertVersionedAsync(key, oldVersion, newVersion, u => u.Set(x => x.Doc, value)); + using (Profiler.TraceMethod>()) + { + await Collection.UpsertVersionedAsync(key, oldVersion, newVersion, u => u.Set(x => x.Doc, value)); + } } - public Task ReadAllAsync(System.Func callback) + public async Task ReadAllAsync(System.Func callback) { - return Collection.Find(new BsonDocument()).ForEachAsync(x => callback(x.Doc, x.Version)); + using (Profiler.TraceMethod>()) + { + return Collection.Find(new BsonDocument()).ForEachAsync(x => callback(x.Doc, x.Version)); + } } } }