diff --git a/backend/src/Migrations/RebuilderExtensions.cs b/backend/src/Migrations/RebuilderExtensions.cs index 472694ad1..ee42014af 100644 --- a/backend/src/Migrations/RebuilderExtensions.cs +++ b/backend/src/Migrations/RebuilderExtensions.cs @@ -18,40 +18,42 @@ namespace Migrations { public static class RebuilderExtensions { + private const double AllowedErrorRate = 0.02; + public static Task RebuildAppsAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^app\\-", batchSize, ct); + return rebuilder.RebuildAsync("^app\\-", batchSize, AllowedErrorRate, ct); } public static Task RebuildSchemasAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^schema\\-", batchSize, ct); + return rebuilder.RebuildAsync("^schema\\-", batchSize, AllowedErrorRate, ct); } public static Task RebuildRulesAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^rule\\-", batchSize, ct); + return rebuilder.RebuildAsync("^rule\\-", batchSize, AllowedErrorRate, ct); } public static Task RebuildAssetsAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^asset\\-", batchSize, ct); + return rebuilder.RebuildAsync("^asset\\-", batchSize, AllowedErrorRate, ct); } public static Task RebuildAssetFoldersAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^assetFolder\\-", batchSize, ct); + return rebuilder.RebuildAsync("^assetFolder\\-", batchSize, AllowedErrorRate, ct); } public static Task RebuildContentAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^content\\-", batchSize, ct); + return rebuilder.RebuildAsync("^content\\-", batchSize, AllowedErrorRate, ct); } } } diff --git a/backend/src/Squidex.Domain.Apps.Core.Operations/ExtractReferenceIds/ContentReferencesExtensions.cs b/backend/src/Squidex.Domain.Apps.Core.Operations/ExtractReferenceIds/ContentReferencesExtensions.cs index 3cff3719c..7db5c4cde 100644 --- a/backend/src/Squidex.Domain.Apps.Core.Operations/ExtractReferenceIds/ContentReferencesExtensions.cs +++ b/backend/src/Squidex.Domain.Apps.Core.Operations/ExtractReferenceIds/ContentReferencesExtensions.cs @@ -6,6 +6,7 @@ // ========================================================================== using System.Collections.Generic; +using System.Linq; using System.Text; using Squidex.Domain.Apps.Core.Contents; using Squidex.Domain.Apps.Core.Schemas; @@ -16,6 +17,37 @@ namespace Squidex.Domain.Apps.Core.ExtractReferenceIds { public static class ContentReferencesExtensions { + public static bool CanHaveReference(this ContentData source) + { + if (source.Count == 0) + { + return false; + } + + static bool CanHaveReference(IJsonValue value) + { + if (value is JsonArray) + { + return true; + } + + if (value is JsonObject obj) + { + foreach (var nested in obj.Values) + { + if (CanHaveReference(nested)) + { + return true; + } + } + } + + return false; + } + + return source.Values.NotNull().SelectMany(x => x.Values).Any(CanHaveReference); + } + public static HashSet GetReferencedIds(this ContentData source, Schema schema, ResolvedComponents components, int referencesPerField = int.MaxValue) { diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs index 7fbf8027c..018bbc4b9 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs @@ -94,6 +94,12 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents foreach (var (_, value, version) in snapshots) { + // Some data is corrupt and might throw an exception during migration if we do not skip them. + if (value.AppId == null || value.CurrentVersion == null) + { + continue; + } + if (ShouldWritePublished(value)) { entitiesPublished.Add(await CreatePublishedContentAsync(value, version)); @@ -177,19 +183,20 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents entity.IndexedSchemaId = value.SchemaId.Id; entity.Version = newVersion; - var schema = await appProvider.GetSchemaAsync(value.AppId.Id, value.SchemaId.Id, true); - - if (schema != null) + if (data.CanHaveReference()) { - var components = await appProvider.GetComponentsAsync(schema); + var schema = await appProvider.GetSchemaAsync(value.AppId.Id, value.SchemaId.Id, true); - entity.ReferencedIds = entity.Data.GetReferencedIds(schema.SchemaDef, components); - } - else - { - entity.ReferencedIds = new HashSet(); + if (schema != null) + { + var components = await appProvider.GetComponentsAsync(schema); + + entity.ReferencedIds = entity.Data.GetReferencedIds(schema.SchemaDef, components); + } } + entity.ReferencedIds ??= new HashSet(); + return entity; } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Comments/IWatchingService.cs b/backend/src/Squidex.Domain.Apps.Entities/Comments/IWatchingService.cs index 9e7c158f5..c88c01405 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Comments/IWatchingService.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Comments/IWatchingService.cs @@ -5,8 +5,8 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using Squidex.Infrastructure; using System.Threading.Tasks; +using Squidex.Infrastructure; namespace Squidex.Domain.Apps.Entities.Comments { diff --git a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs index 8dd377ca7..10ec74518 100644 --- a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs +++ b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs @@ -16,6 +16,7 @@ using Squidex.Caching; using Squidex.Infrastructure.EventSourcing; using Squidex.Infrastructure.States; using Squidex.Infrastructure.Tasks; +using Squidex.Log; #pragma warning disable RECS0108 // Warns about static fields in generic types @@ -26,6 +27,7 @@ namespace Squidex.Infrastructure.Commands private readonly ILocalCache localCache; private readonly IEventStore eventStore; private readonly IServiceProvider serviceProvider; + private readonly ISemanticLog log; private static class Factory where T : DomainObject where TState : class, IDomainState, new() { @@ -40,14 +42,23 @@ namespace Squidex.Infrastructure.Commands public Rebuilder( ILocalCache localCache, IEventStore eventStore, - IServiceProvider serviceProvider) + IServiceProvider serviceProvider, + ISemanticLog log) { this.eventStore = eventStore; this.serviceProvider = serviceProvider; + this.log = log; this.localCache = localCache; } - public virtual async Task RebuildAsync(string filter, int batchSize, + public virtual Task RebuildAsync(string filter, int batchSize, + CancellationToken ct = default) + where T : DomainObject where TState : class, IDomainState, new() + { + return RebuildAsync(filter, batchSize, 0, ct); + } + + public virtual async Task RebuildAsync(string filter, int batchSize, double errorThreshold, CancellationToken ct = default) where T : DomainObject where TState : class, IDomainState, new() { @@ -55,10 +66,17 @@ namespace Squidex.Infrastructure.Commands var ids = eventStore.QueryAllAsync(filter, ct: ct).Select(x => x.Data.Headers.AggregateId()); - await InsertManyAsync(ids, batchSize, ct); + await InsertManyAsync(ids, batchSize, errorThreshold, ct); + } + + public virtual Task InsertManyAsync(IEnumerable source, int batchSize, + CancellationToken ct = default) + where T : DomainObject where TState : class, IDomainState, new() + { + return InsertManyAsync(source, batchSize, 0, ct); } - public virtual async Task InsertManyAsync(IEnumerable source, int batchSize, + public virtual async Task InsertManyAsync(IEnumerable source, int batchSize, double errorThreshold = 0, CancellationToken ct = default) where T : DomainObject where TState : class, IDomainState, new() { @@ -66,10 +84,10 @@ namespace Squidex.Infrastructure.Commands var ids = source.ToAsyncEnumerable(); - await InsertManyAsync(ids, batchSize, ct); + await InsertManyAsync(ids, batchSize, errorThreshold, ct); } - private async Task InsertManyAsync(IAsyncEnumerable source, int batchSize, + private async Task InsertManyAsync(IAsyncEnumerable source, int batchSize, double errorThreshold, CancellationToken ct = default) where T : DomainObject where TState : class, IDomainState, new() { @@ -77,55 +95,65 @@ namespace Squidex.Infrastructure.Commands var parallelism = Environment.ProcessorCount; - var workerBlock = new ActionBlock(async ids => + var handledIds = new HashSet(); + var handlerErrors = 0; + + using (localCache.StartContext()) { - try + var workerBlock = new ActionBlock(async ids => { - await using (var context = store.WithBatchContext(typeof(T))) + try { - await context.LoadAsync(ids); - - foreach (var id in ids) + await using (var context = store.WithBatchContext(typeof(T))) { - try - { - var domainObject = Factory.Create(serviceProvider, context); - - domainObject.Setup(id); + await context.LoadAsync(ids); - await domainObject.RebuildStateAsync(); - } - catch (DomainObjectNotFoundException) + foreach (var id in ids) { - return; + try + { + var domainObject = Factory.Create(serviceProvider, context); + + domainObject.Setup(id); + + await domainObject.RebuildStateAsync(); + } + catch (DomainObjectNotFoundException) + { + return; + } + catch (Exception ex) + { + log.LogWarning(ex, w => w + .WriteProperty("reason", "CorruptData") + .WriteProperty("domainObjectId", id.ToString()) + .WriteProperty("domainObjectType", typeof(T).Name)); + + Interlocked.Increment(ref handlerErrors); + } } } } - } - catch (OperationCanceledException ex) + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); + } + }, + new ExecutionDataflowBlockOptions { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, - new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = parallelism, - MaxMessagesPerTask = 10, - BoundedCapacity = parallelism - }); - - var batchBlock = new BatchBlock(batchSize, new GroupingDataflowBlockOptions - { - BoundedCapacity = batchSize - }); + MaxDegreeOfParallelism = parallelism, + MaxMessagesPerTask = 10, + BoundedCapacity = parallelism + }); - batchBlock.BidirectionalLinkTo(workerBlock); + var batchBlock = new BatchBlock(batchSize, new GroupingDataflowBlockOptions + { + BoundedCapacity = batchSize + }); - var handledIds = new HashSet(); + batchBlock.BidirectionalLinkTo(workerBlock); - using (localCache.StartContext()) - { await foreach (var id in source.WithCancellation(ct)) { if (handledIds.Add(id)) @@ -138,9 +166,16 @@ namespace Squidex.Infrastructure.Commands } batchBlock.Complete(); + + await workerBlock.Completion; } - await workerBlock.Completion; + var errorRate = (double)handlerErrors / handledIds.Count; + + if (errorRate >= errorThreshold) + { + throw new InvalidOperationException($"Error rate of {errorRate} is above threshold {errorThreshold}."); + } } private async Task ClearAsync() where TState : class, IDomainState, new()