Browse Source

More profiling.

pull/289/head
Sebastian 8 years ago
parent
commit
783e7b7385
  1. 82
      src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs
  2. 31
      src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs
  3. 66
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs
  4. 59
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs
  5. 104
      src/Squidex.Domain.Apps.Entities/Contents/ContentQueryService.cs
  6. 32
      src/Squidex.Domain.Apps.Entities/Contents/ContentVersionLoader.cs
  7. 91
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs
  8. 81
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
  9. 74
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs
  10. 34
      src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs

82
src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs

@ -15,6 +15,7 @@ using Squidex.Domain.Apps.Entities.Assets.Edm;
using Squidex.Domain.Apps.Entities.Assets.Repositories;
using Squidex.Domain.Apps.Entities.MongoDb.Assets.Visitors;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.MongoDb;
namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
@ -43,64 +44,73 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
public async Task<IResultList<IAssetEntity>> QueryAsync(Guid appId, string query = null)
{
try
using (Profiler.TraceMethod<MongoAssetRepository>("QueryAsyncByQuery"))
{
var odataQuery = EdmAssetModel.Edm.ParseQuery(query);
try
{
var odataQuery = EdmAssetModel.Edm.ParseQuery(query);
var filter = FindExtensions.BuildQuery(odataQuery, appId);
var filter = FindExtensions.BuildQuery(odataQuery, appId);
var contentCount = Collection.Find(filter).CountAsync();
var contentItems =
Collection.Find(filter)
.AssetTake(odataQuery)
.AssetSkip(odataQuery)
.AssetSort(odataQuery)
.ToListAsync();
var contentCount = Collection.Find(filter).CountAsync();
var contentItems =
Collection.Find(filter)
.AssetTake(odataQuery)
.AssetSkip(odataQuery)
.AssetSort(odataQuery)
.ToListAsync();
await Task.WhenAll(contentItems, contentCount);
await Task.WhenAll(contentItems, contentCount);
return ResultList.Create<IAssetEntity>(contentItems.Result, contentCount.Result);
}
catch (NotSupportedException)
{
throw new ValidationException("This odata operation is not supported.");
}
catch (NotImplementedException)
{
throw new ValidationException("This odata operation is not supported.");
}
catch (MongoQueryException ex)
{
if (ex.Message.Contains("17406"))
return ResultList.Create<IAssetEntity>(contentItems.Result, contentCount.Result);
}
catch (NotSupportedException)
{
throw new ValidationException("This odata operation is not supported.");
}
catch (NotImplementedException)
{
throw new DomainException("Result set is too large to be retrieved. Use $top parameter to reduce the number of items.");
throw new ValidationException("This odata operation is not supported.");
}
else
catch (MongoQueryException ex)
{
throw;
if (ex.Message.Contains("17406"))
{
throw new DomainException("Result set is too large to be retrieved. Use $top parameter to reduce the number of items.");
}
else
{
throw;
}
}
}
}
public async Task<IResultList<IAssetEntity>> QueryAsync(Guid appId, HashSet<Guid> ids)
{
var find = Collection.Find(x => ids.Contains(x.Id)).SortByDescending(x => x.LastModified);
using (Profiler.TraceMethod<MongoAssetRepository>("QueryAsyncByIds"))
{
var find = Collection.Find(x => ids.Contains(x.Id)).SortByDescending(x => x.LastModified);
var assetItems = find.ToListAsync();
var assetCount = find.CountAsync();
var assetItems = find.ToListAsync();
var assetCount = find.CountAsync();
await Task.WhenAll(assetItems, assetCount);
await Task.WhenAll(assetItems, assetCount);
return ResultList.Create(assetItems.Result.OfType<IAssetEntity>().ToList(), assetCount.Result);
return ResultList.Create(assetItems.Result.OfType<IAssetEntity>().ToList(), assetCount.Result);
}
}
public async Task<IAssetEntity> FindAssetAsync(Guid id)
{
var assetEntity =
await Collection.Find(x => x.Id == id)
.FirstOrDefaultAsync();
using (Profiler.TraceMethod<MongoAssetRepository>())
{
var assetEntity =
await Collection.Find(x => x.Id == id)
.FirstOrDefaultAsync();
return assetEntity;
return assetEntity;
}
}
}
}

31
src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs

@ -10,6 +10,7 @@ using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Domain.Apps.Entities.Assets.State;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.States;
@ -24,26 +25,32 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
public async Task<(AssetState Value, long Version)> ReadAsync(Guid key)
{
var existing =
await Collection.Find(x => x.Id == key)
.FirstOrDefaultAsync();
if (existing != null)
using (Profiler.TraceMethod<MongoAssetRepository>())
{
return (SimpleMapper.Map(existing, new AssetState()), existing.Version);
}
var existing =
await Collection.Find(x => x.Id == key)
.FirstOrDefaultAsync();
if (existing != null)
{
return (SimpleMapper.Map(existing, new AssetState()), existing.Version);
}
return (null, EtagVersion.NotFound);
return (null, EtagVersion.NotFound);
}
}
public async Task WriteAsync(Guid key, AssetState value, long oldVersion, long newVersion)
{
var entity = SimpleMapper.Map(value, new MongoAssetEntity());
using (Profiler.TraceMethod<MongoAssetRepository>())
{
var entity = SimpleMapper.Map(value, new MongoAssetEntity());
entity.Version = newVersion;
entity.IndexedAppId = value.AppId.Id;
entity.Version = newVersion;
entity.IndexedAppId = value.AppId.Id;
await Collection.ReplaceOneAsync(x => x.Id == key && x.Version == oldVersion, entity, Upsert);
await Collection.ReplaceOneAsync(x => x.Id == key && x.Version == oldVersion, entity, Upsert);
}
}
}
}

66
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs

@ -17,6 +17,7 @@ using Squidex.Domain.Apps.Entities.Contents;
using Squidex.Domain.Apps.Entities.Contents.Repositories;
using Squidex.Domain.Apps.Entities.Schemas;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Log;
namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
{
@ -45,50 +46,65 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
contentsPublished.Initialize();
}
public Task<IResultList<IContentEntity>> QueryAsync(IAppEntity app, ISchemaEntity schema, Status[] status, ODataUriParser odataQuery)
public async Task<IResultList<IContentEntity>> QueryAsync(IAppEntity app, ISchemaEntity schema, Status[] status, ODataUriParser odataQuery)
{
if (RequiresPublished(status))
using (Profiler.TraceMethod<MongoContentRepository>("QueryAsyncByQuery"))
{
return contentsPublished.QueryAsync(app, schema, odataQuery);
}
else
{
return contentsDraft.QueryAsync(app, schema, odataQuery, status, true);
if (RequiresPublished(status))
{
return await contentsPublished.QueryAsync(app, schema, odataQuery);
}
else
{
return await contentsDraft.QueryAsync(app, schema, odataQuery, status, true);
}
}
}
public Task<IResultList<IContentEntity>> QueryAsync(IAppEntity app, ISchemaEntity schema, Status[] status, HashSet<Guid> ids)
public async Task<IResultList<IContentEntity>> QueryAsync(IAppEntity app, ISchemaEntity schema, Status[] status, HashSet<Guid> ids)
{
if (RequiresPublished(status))
using (Profiler.TraceMethod<MongoContentRepository>("QueryAsyncByIds"))
{
return contentsPublished.QueryAsync(app, schema, ids);
}
else
{
return contentsDraft.QueryAsync(app, schema, ids, status);
if (RequiresPublished(status))
{
return await contentsPublished.QueryAsync(app, schema, ids);
}
else
{
return await contentsDraft.QueryAsync(app, schema, ids, status);
}
}
}
public Task<IContentEntity> FindContentAsync(IAppEntity app, ISchemaEntity schema, Status[] status, Guid id)
public async Task<IContentEntity> FindContentAsync(IAppEntity app, ISchemaEntity schema, Status[] status, Guid id)
{
if (RequiresPublished(status))
{
return contentsPublished.FindContentAsync(app, schema, id);
}
else
using (Profiler.TraceMethod<MongoContentRepository>())
{
return contentsDraft.FindContentAsync(app, schema, id);
if (RequiresPublished(status))
{
return await contentsPublished.FindContentAsync(app, schema, id);
}
else
{
return await contentsDraft.FindContentAsync(app, schema, id);
}
}
}
public Task<IReadOnlyList<Guid>> QueryNotFoundAsync(Guid appId, Guid schemaId, IList<Guid> ids)
public async Task<IReadOnlyList<Guid>> QueryNotFoundAsync(Guid appId, Guid schemaId, IList<Guid> ids)
{
return contentsDraft.QueryNotFoundAsync(appId, schemaId, ids);
using (Profiler.TraceMethod<MongoContentRepository>())
{
await contentsDraft.QueryNotFoundAsync(appId, schemaId, ids);
}
}
public Task QueryScheduledWithoutDataAsync(Instant now, Func<IContentEntity, Task> callback)
public async Task QueryScheduledWithoutDataAsync(Instant now, Func<IContentEntity, Task> callback)
{
return contentsDraft.QueryScheduledWithoutDataAsync(now, callback);
using (Profiler.TraceMethod<MongoContentRepository>())
{
await contentsDraft.QueryScheduledWithoutDataAsync(now, callback);
}
}
public Task ClearAsync()

59
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs

@ -12,6 +12,7 @@ using Squidex.Domain.Apps.Core.ConvertContent;
using Squidex.Domain.Apps.Entities.Contents.State;
using Squidex.Domain.Apps.Entities.Schemas;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.States;
@ -19,43 +20,49 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
{
public partial class MongoContentRepository : ISnapshotStore<ContentState, Guid>
{
public Task<(ContentState Value, long Version)> ReadAsync(Guid key)
public async Task<(ContentState Value, long Version)> ReadAsync(Guid key)
{
return contentsDraft.ReadAsync(key, GetSchemaAsync);
using (Profiler.TraceMethod<MongoContentRepository>())
{
return await contentsDraft.ReadAsync(key, GetSchemaAsync);
}
}
public async Task WriteAsync(Guid key, ContentState value, long oldVersion, long newVersion)
{
if (value.SchemaId.Id == Guid.Empty)
using (Profiler.TraceMethod<MongoContentRepository>())
{
return;
}
if (value.SchemaId.Id == Guid.Empty)
{
return;
}
var schema = await GetSchemaAsync(value.AppId.Id, value.SchemaId.Id);
var schema = await GetSchemaAsync(value.AppId.Id, value.SchemaId.Id);
var idData = value.Data.ToIdModel(schema.SchemaDef, true);
var idData = value.Data.ToIdModel(schema.SchemaDef, true);
var content = SimpleMapper.Map(value, new MongoContentEntity
{
DataByIds = idData,
DataDraftByIds = value.DataDraft?.ToIdModel(schema.SchemaDef, true),
IsDeleted = value.IsDeleted,
IndexedAppId = value.AppId.Id,
IndexedSchemaId = value.SchemaId.Id,
ReferencedIds = idData.ToReferencedIds(schema.SchemaDef),
ScheduledAt = value.ScheduleJob?.DueTime,
Version = newVersion
});
var content = SimpleMapper.Map(value, new MongoContentEntity
{
DataByIds = idData,
DataDraftByIds = value.DataDraft?.ToIdModel(schema.SchemaDef, true),
IsDeleted = value.IsDeleted,
IndexedAppId = value.AppId.Id,
IndexedSchemaId = value.SchemaId.Id,
ReferencedIds = idData.ToReferencedIds(schema.SchemaDef),
ScheduledAt = value.ScheduleJob?.DueTime,
Version = newVersion
});
await contentsDraft.UpsertAsync(content, oldVersion);
await contentsDraft.UpsertAsync(content, oldVersion);
if (value.Status == Status.Published && !value.IsDeleted)
{
await contentsPublished.UpsertAsync(content);
}
else
{
await contentsPublished.RemoveAsync(content.Id);
if (value.Status == Status.Published && !value.IsDeleted)
{
await contentsPublished.UpsertAsync(content);
}
else
{
await contentsPublished.RemoveAsync(content.Id);
}
}
}

104
src/Squidex.Domain.Apps.Entities/Contents/ContentQueryService.cs

@ -20,6 +20,7 @@ using Squidex.Domain.Apps.Entities.Contents.Edm;
using Squidex.Domain.Apps.Entities.Contents.Repositories;
using Squidex.Domain.Apps.Entities.Schemas;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.Security;
@ -71,22 +72,25 @@ namespace Squidex.Domain.Apps.Entities.Contents
var schema = await GetSchemaAsync(app, schemaIdOrName);
var isVersioned = version > EtagVersion.Empty;
var isFrontend = IsFrontendClient(user);
using (Profiler.TraceMethod<ContentQueryService>())
{
var isVersioned = version > EtagVersion.Empty;
var isFrontend = IsFrontendClient(user);
var parsedStatus = isFrontend ? StatusAll : StatusPublished;
var parsedStatus = isFrontend ? StatusAll : StatusPublished;
var content =
isVersioned ?
await FindContentByVersionAsync(id, version) :
await FindContentAsync(app, id, parsedStatus, schema);
var content =
isVersioned ?
await FindContentByVersionAsync(id, version) :
await FindContentAsync(app, id, parsedStatus, schema);
if (content == null || (content.Status != Status.Published && !isFrontend) || content.SchemaId.Id != schema.Id)
{
throw new DomainObjectNotFoundException(id.ToString(), typeof(ISchemaEntity));
}
if (content == null || (content.Status != Status.Published && !isFrontend) || content.SchemaId.Id != schema.Id)
{
throw new DomainObjectNotFoundException(id.ToString(), typeof(ISchemaEntity));
}
return TransformContent(app, schema, user, content, isFrontend, isVersioned);
return TransformContent(app, schema, user, content, isFrontend, isVersioned);
}
}
public async Task<IResultList<IContentEntity>> QueryAsync(IAppEntity app, string schemaIdOrName, ClaimsPrincipal user, bool archived, string query)
@ -97,14 +101,17 @@ namespace Squidex.Domain.Apps.Entities.Contents
var schema = await GetSchemaAsync(app, schemaIdOrName);
var isFrontend = IsFrontendClient(user);
using (Profiler.TraceMethod<ContentQueryService>("QueryAsyncByQuery"))
{
var isFrontend = IsFrontendClient(user);
var parsedQuery = ParseQuery(app, query, schema);
var parsedStatus = ParseStatus(isFrontend, archived);
var parsedQuery = ParseQuery(app, query, schema);
var parsedStatus = ParseStatus(isFrontend, archived);
var contents = await contentRepository.QueryAsync(app, schema, parsedStatus, parsedQuery);
var contents = await contentRepository.QueryAsync(app, schema, parsedStatus, parsedQuery);
return TransformContents(app, schema, user, contents, false, isFrontend);
return TransformContents(app, schema, user, contents, false, isFrontend);
}
}
public async Task<IResultList<IContentEntity>> QueryAsync(IAppEntity app, string schemaIdOrName, ClaimsPrincipal user, bool archived, HashSet<Guid> ids)
@ -116,13 +123,16 @@ namespace Squidex.Domain.Apps.Entities.Contents
var schema = await GetSchemaAsync(app, schemaIdOrName);
var isFrontend = IsFrontendClient(user);
using (Profiler.TraceMethod<ContentQueryService>("QueryAsyncByIds"))
{
var isFrontend = IsFrontendClient(user);
var parsedStatus = ParseStatus(isFrontend, archived);
var parsedStatus = ParseStatus(isFrontend, archived);
var contents = await contentRepository.QueryAsync(app, schema, parsedStatus, ids);
var contents = await contentRepository.QueryAsync(app, schema, parsedStatus, ids);
return TransformContents(app, schema, user, contents, false, isFrontend);
return TransformContents(app, schema, user, contents, false, isFrontend);
}
}
private IContentEntity TransformContent(IAppEntity app, ISchemaEntity schema, ClaimsPrincipal user,
@ -148,44 +158,50 @@ namespace Squidex.Domain.Apps.Entities.Contents
bool isTypeChecking,
bool isFrontendClient)
{
var scriptText = schema.ScriptQuery;
var isScripting = !string.IsNullOrWhiteSpace(scriptText);
foreach (var content in contents)
using (Profiler.TraceMethod<ContentQueryService>())
{
var result = SimpleMapper.Map(content, new ContentEntity());
var scriptText = schema.ScriptQuery;
var isScripting = !string.IsNullOrWhiteSpace(scriptText);
if (result.Data != null)
foreach (var content in contents)
{
if (!isFrontendClient && isScripting)
var result = SimpleMapper.Map(content, new ContentEntity());
if (result.Data != null)
{
result.Data = scriptEngine.Transform(new ScriptContext { User = user, Data = content.Data, ContentId = content.Id }, scriptText);
if (!isFrontendClient && isScripting)
{
result.Data = scriptEngine.Transform(new ScriptContext { User = user, Data = content.Data, ContentId = content.Id }, scriptText);
}
result.Data = result.Data.ToApiModel(schema.SchemaDef, app.LanguagesConfig, isFrontendClient, isTypeChecking);
}
result.Data = result.Data.ToApiModel(schema.SchemaDef, app.LanguagesConfig, isFrontendClient, isTypeChecking);
}
if (result.DataDraft != null)
{
result.DataDraft = result.DataDraft.ToApiModel(schema.SchemaDef, app.LanguagesConfig, isFrontendClient, isTypeChecking);
}
if (result.DataDraft != null)
{
result.DataDraft = result.DataDraft.ToApiModel(schema.SchemaDef, app.LanguagesConfig, isFrontendClient, isTypeChecking);
yield return result;
}
yield return result;
}
}
private ODataUriParser ParseQuery(IAppEntity app, string query, ISchemaEntity schema)
{
try
using (Profiler.TraceMethod<ContentQueryService>())
{
var model = modelBuilder.BuildEdmModel(schema, app);
try
{
var model = modelBuilder.BuildEdmModel(schema, app);
return model.ParseQuery(query);
}
catch (ODataException ex)
{
throw new ValidationException($"Failed to parse query: {ex.Message}", ex);
return model.ParseQuery(query);
}
catch (ODataException ex)
{
throw new ValidationException($"Failed to parse query: {ex.Message}", ex);
}
}
}

32
src/Squidex.Domain.Apps.Entities/Contents/ContentVersionLoader.cs

@ -10,6 +10,7 @@ using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.Schemas;
using Squidex.Domain.Apps.Entities.Contents.State;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.States;
namespace Squidex.Domain.Apps.Entities.Contents
@ -31,25 +32,28 @@ namespace Squidex.Domain.Apps.Entities.Contents
public async Task<IContentEntity> LoadAsync(Guid id, long version)
{
var content = new ContentState();
var persistence = store.WithEventSourcing<ContentGrain, Guid>(id, e =>
using (Profiler.TraceMethod<ContentVersionLoader>())
{
if (content.Version < version)
var content = new ContentState();
var persistence = store.WithEventSourcing<ContentGrain, Guid>(id, e =>
{
content = content.Apply(e);
content.Version++;
}
});
if (content.Version < version)
{
content = content.Apply(e);
content.Version++;
}
});
await persistence.ReadAsync();
await persistence.ReadAsync();
if (content.Version != version)
{
throw new DomainObjectNotFoundException(id.ToString(), typeof(IContentEntity));
}
if (content.Version != version)
{
throw new DomainObjectNotFoundException(id.ToString(), typeof(IContentEntity));
}
return content;
return content;
}
}
}
}

91
src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs

@ -11,6 +11,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using Squidex.Infrastructure.Log;
namespace Squidex.Infrastructure.EventSourcing
{
@ -59,20 +60,26 @@ namespace Squidex.Infrastructure.EventSourcing
public async Task QueryAsync(Func<StoredEvent, Task> callback, string property, object value, string position = null, CancellationToken ct = default(CancellationToken))
{
var streamName = await projectionClient.CreateProjectionAsync(property, value);
using (Profiler.TraceMethod<GetEventStore>())
{
var streamName = await projectionClient.CreateProjectionAsync(property, value);
var sliceStart = projectionClient.ParsePosition(position);
var sliceStart = projectionClient.ParsePosition(position);
await QueryAsync(callback, streamName, sliceStart, ct);
await QueryAsync(callback, streamName, sliceStart, ct);
}
}
public async Task QueryAsync(Func<StoredEvent, Task> callback, string streamFilter = null, string position = null, CancellationToken ct = default(CancellationToken))
{
var streamName = await projectionClient.CreateProjectionAsync(streamFilter);
using (Profiler.TraceMethod<GetEventStore>())
{
var streamName = await projectionClient.CreateProjectionAsync(streamFilter);
var sliceStart = projectionClient.ParsePosition(position);
var sliceStart = projectionClient.ParsePosition(position);
await QueryAsync(callback, streamName, sliceStart, ct);
await QueryAsync(callback, streamName, sliceStart, ct);
}
}
private Task QueryAsync(Func<StoredEvent, Task> callback, string streamName, long sliceStart, CancellationToken ct)
@ -82,30 +89,33 @@ namespace Squidex.Infrastructure.EventSourcing
public async Task<IReadOnlyList<StoredEvent>> QueryAsync(string streamName, long streamPosition = 0)
{
var result = new List<StoredEvent>();
var sliceStart = streamPosition;
StreamEventsSlice currentSlice;
do
using (Profiler.TraceMethod<GetEventStore>())
{
currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, false);
var result = new List<StoredEvent>();
if (currentSlice.Status == SliceReadStatus.Success)
var sliceStart = streamPosition;
StreamEventsSlice currentSlice;
do
{
sliceStart = currentSlice.NextEventNumber;
currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, false);
foreach (var resolved in currentSlice.Events)
if (currentSlice.Status == SliceReadStatus.Success)
{
var storedEvent = Formatter.Read(resolved);
sliceStart = currentSlice.NextEventNumber;
foreach (var resolved in currentSlice.Events)
{
var storedEvent = Formatter.Read(resolved);
result.Add(storedEvent);
result.Add(storedEvent);
}
}
}
}
while (!currentSlice.IsEndOfStream);
while (!currentSlice.IsEndOfStream);
return result;
return result;
}
}
public Task AppendAsync(Guid commitId, string streamName, ICollection<EventData> events)
@ -122,30 +132,33 @@ namespace Squidex.Infrastructure.EventSourcing
private async Task AppendEventsInternalAsync(string streamName, long expectedVersion, ICollection<EventData> events)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
if (events.Count == 0)
using (Profiler.TraceMethod<GetEventStore>(nameof(AppendAsync)))
{
return;
}
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
var eventsToSave = events.Select(Formatter.Write).ToList();
if (events.Count == 0)
{
return;
}
if (eventsToSave.Count < WritePageSize)
{
await connection.AppendToStreamAsync(GetStreamName(streamName), expectedVersion, eventsToSave);
}
else
{
using (var transaction = await connection.StartTransactionAsync(GetStreamName(streamName), expectedVersion))
var eventsToSave = events.Select(Formatter.Write).ToList();
if (eventsToSave.Count < WritePageSize)
{
await connection.AppendToStreamAsync(GetStreamName(streamName), expectedVersion, eventsToSave);
}
else
{
for (var p = 0; p < eventsToSave.Count; p += WritePageSize)
using (var transaction = await connection.StartTransactionAsync(GetStreamName(streamName), expectedVersion))
{
await transaction.WriteAsync(eventsToSave.Skip(p).Take(WritePageSize));
}
for (var p = 0; p < eventsToSave.Count; p += WritePageSize)
{
await transaction.WriteAsync(eventsToSave.Skip(p).Take(WritePageSize));
}
await transaction.CommitAsync();
await transaction.CommitAsync();
}
}
}
}

81
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs

@ -10,6 +10,7 @@ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.MongoDb;
namespace Squidex.Infrastructure.EventSourcing
@ -31,37 +32,40 @@ namespace Squidex.Infrastructure.EventSourcing
public async Task<IReadOnlyList<StoredEvent>> QueryAsync(string streamName, long streamPosition = 0)
{
var commits =
await Collection.Find(
Filter.And(
Filter.Eq(EventStreamField, streamName),
Filter.Gte(EventStreamOffsetField, streamPosition - 1)))
.Sort(Sort.Ascending(TimestampField)).ToListAsync();
var result = new List<StoredEvent>();
foreach (var commit in commits)
using (Profiler.TraceMethod<MongoEventStore>())
{
var eventStreamOffset = (int)commit.EventStreamOffset;
var commits =
await Collection.Find(
Filter.And(
Filter.Eq(EventStreamField, streamName),
Filter.Gte(EventStreamOffsetField, streamPosition - 1)))
.Sort(Sort.Ascending(TimestampField)).ToListAsync();
var commitTimestamp = commit.Timestamp;
var commitOffset = 0;
var result = new List<StoredEvent>();
foreach (var e in commit.Events)
foreach (var commit in commits)
{
eventStreamOffset++;
var eventStreamOffset = (int)commit.EventStreamOffset;
var commitTimestamp = commit.Timestamp;
var commitOffset = 0;
if (eventStreamOffset >= streamPosition)
foreach (var e in commit.Events)
{
var eventData = e.ToEventData();
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
eventStreamOffset++;
result.Add(new StoredEvent(eventToken, eventStreamOffset, eventData));
if (eventStreamOffset >= streamPosition)
{
var eventData = e.ToEventData();
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
result.Add(new StoredEvent(eventToken, eventStreamOffset, eventData));
}
}
}
}
return result;
return result;
}
}
public Task QueryAsync(Func<StoredEvent, Task> callback, string property, object value, string position = null, CancellationToken ct = default(CancellationToken))
@ -86,30 +90,33 @@ namespace Squidex.Infrastructure.EventSourcing
return QueryAsync(callback, lastPosition, filter, ct);
}
private Task QueryAsync(Func<StoredEvent, Task> callback, StreamPosition lastPosition, FilterDefinition<MongoEventCommit> filter, CancellationToken ct)
private async Task QueryAsync(Func<StoredEvent, Task> callback, StreamPosition lastPosition, FilterDefinition<MongoEventCommit> filter, CancellationToken ct)
{
return Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit =>
using (Profiler.TraceMethod<MongoEventStore>())
{
var eventStreamOffset = (int)commit.EventStreamOffset;
var commitTimestamp = commit.Timestamp;
var commitOffset = 0;
foreach (var e in commit.Events)
await Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit =>
{
eventStreamOffset++;
var eventStreamOffset = (int)commit.EventStreamOffset;
var commitTimestamp = commit.Timestamp;
var commitOffset = 0;
if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp)
foreach (var e in commit.Events)
{
var eventData = e.ToEventData();
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
eventStreamOffset++;
await callback(new StoredEvent(eventToken, eventStreamOffset, eventData));
if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp)
{
var eventData = e.ToEventData();
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
commitOffset++;
await callback(new StoredEvent(eventToken, eventStreamOffset, eventData));
commitOffset++;
}
}
}
}, ct);
}, ct);
}
}
private static FilterDefinition<MongoEventCommit> CreateFilter(string property, object value, StreamPosition streamPosition)

74
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs

@ -11,6 +11,7 @@ using System.Reactive.Linq;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
using Squidex.Infrastructure.Log;
namespace Squidex.Infrastructure.EventSourcing
{
@ -26,58 +27,61 @@ namespace Squidex.Infrastructure.EventSourcing
public async Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
{
Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion));
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
if (events.Count == 0)
using (Profiler.TraceMethod<MongoEventStore>())
{
return;
}
var currentVersion = await GetEventStreamOffset(streamName);
Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion));
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion)
{
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
if (events.Count == 0)
{
return;
}
var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events);
var currentVersion = await GetEventStreamOffset(streamName);
for (var attempt = 0; attempt < MaxWriteAttempts; attempt++)
{
try
if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion)
{
await Collection.InsertOneAsync(commit);
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
notifier.NotifyEventsStored(streamName);
var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events);
return;
}
catch (MongoWriteException ex)
for (var attempt = 0; attempt < MaxWriteAttempts; attempt++)
{
if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
try
{
currentVersion = await GetEventStreamOffset(streamName);
await Collection.InsertOneAsync(commit);
if (expectedVersion != EtagVersion.Any)
{
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
notifier.NotifyEventsStored(streamName);
if (attempt < MaxWriteAttempts)
return;
}
catch (MongoWriteException ex)
{
if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
{
expectedVersion = currentVersion;
currentVersion = await GetEventStreamOffset(streamName);
if (expectedVersion != EtagVersion.Any)
{
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
if (attempt < MaxWriteAttempts)
{
expectedVersion = currentVersion;
}
else
{
throw new TimeoutException("Could not acquire a free slot for the commit within the provided time.");
}
}
else
{
throw new TimeoutException("Could not acquire a free slot for the commit within the provided time.");
throw;
}
}
else
{
throw;
}
}
}
}

34
src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs

@ -10,6 +10,7 @@ using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
using Newtonsoft.Json;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.MongoDb;
namespace Squidex.Infrastructure.States
@ -37,26 +38,35 @@ namespace Squidex.Infrastructure.States
public async Task<(T Value, long Version)> ReadAsync(TKey key)
{
var existing =
await Collection.Find(x => x.Id.Equals(key))
.FirstOrDefaultAsync();
if (existing != null)
using (Profiler.TraceMethod<MongoSnapshotStore<T, TKey>>())
{
return (existing.Doc, existing.Version);
}
var existing =
await Collection.Find(x => x.Id.Equals(key))
.FirstOrDefaultAsync();
return (default(T), EtagVersion.NotFound);
if (existing != null)
{
return (existing.Doc, existing.Version);
}
return (default(T), EtagVersion.NotFound);
}
}
public Task WriteAsync(TKey key, T value, long oldVersion, long newVersion)
public async Task WriteAsync(TKey key, T value, long oldVersion, long newVersion)
{
return Collection.UpsertVersionedAsync(key, oldVersion, newVersion, u => u.Set(x => x.Doc, value));
using (Profiler.TraceMethod<MongoSnapshotStore<T, TKey>>())
{
await Collection.UpsertVersionedAsync(key, oldVersion, newVersion, u => u.Set(x => x.Doc, value));
}
}
public Task ReadAllAsync(System.Func<T, long, Task> callback)
public async Task ReadAllAsync(System.Func<T, long, Task> callback)
{
return Collection.Find(new BsonDocument()).ForEachAsync(x => callback(x.Doc, x.Version));
using (Profiler.TraceMethod<MongoSnapshotStore<T, TKey>>())
{
return Collection.Find(new BsonDocument()).ForEachAsync(x => callback(x.Doc, x.Version));
}
}
}
}

Loading…
Cancel
Save