Browse Source

Fix/migration (#783)

* Temp

* Accept an error threshold.

* Check error rate for each item.

* Get rid of useless variable.
pull/785/head
Sebastian Stehle 4 years ago
committed by GitHub
parent
commit
d338fbb14a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      backend/src/Migrations/RebuilderExtensions.cs
  2. 32
      backend/src/Squidex.Domain.Apps.Core.Operations/ExtractReferenceIds/ContentReferencesExtensions.cs
  3. 25
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs
  4. 2
      backend/src/Squidex.Domain.Apps.Entities/Comments/IWatchingService.cs
  5. 121
      backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs

14
backend/src/Migrations/RebuilderExtensions.cs

@ -18,40 +18,42 @@ namespace Migrations
{
public static class RebuilderExtensions
{
private const double AllowedErrorRate = 0.02;
public static Task RebuildAppsAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<AppDomainObject, AppDomainObject.State>("^app\\-", batchSize, ct);
return rebuilder.RebuildAsync<AppDomainObject, AppDomainObject.State>("^app\\-", batchSize, AllowedErrorRate, ct);
}
public static Task RebuildSchemasAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<SchemaDomainObject, SchemaDomainObject.State>("^schema\\-", batchSize, ct);
return rebuilder.RebuildAsync<SchemaDomainObject, SchemaDomainObject.State>("^schema\\-", batchSize, AllowedErrorRate, ct);
}
public static Task RebuildRulesAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<RuleDomainObject, RuleDomainObject.State>("^rule\\-", batchSize, ct);
return rebuilder.RebuildAsync<RuleDomainObject, RuleDomainObject.State>("^rule\\-", batchSize, AllowedErrorRate, ct);
}
public static Task RebuildAssetsAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<AssetDomainObject, AssetDomainObject.State>("^asset\\-", batchSize, ct);
return rebuilder.RebuildAsync<AssetDomainObject, AssetDomainObject.State>("^asset\\-", batchSize, AllowedErrorRate, ct);
}
public static Task RebuildAssetFoldersAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<AssetFolderDomainObject, AssetFolderDomainObject.State>("^assetFolder\\-", batchSize, ct);
return rebuilder.RebuildAsync<AssetFolderDomainObject, AssetFolderDomainObject.State>("^assetFolder\\-", batchSize, AllowedErrorRate, ct);
}
public static Task RebuildContentAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<ContentDomainObject, ContentDomainObject.State>("^content\\-", batchSize, ct);
return rebuilder.RebuildAsync<ContentDomainObject, ContentDomainObject.State>("^content\\-", batchSize, AllowedErrorRate, ct);
}
}
}

32
backend/src/Squidex.Domain.Apps.Core.Operations/ExtractReferenceIds/ContentReferencesExtensions.cs

@ -6,6 +6,7 @@
// ==========================================================================
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Squidex.Domain.Apps.Core.Contents;
using Squidex.Domain.Apps.Core.Schemas;
@ -16,6 +17,37 @@ namespace Squidex.Domain.Apps.Core.ExtractReferenceIds
{
public static class ContentReferencesExtensions
{
public static bool CanHaveReference(this ContentData source)
{
if (source.Count == 0)
{
return false;
}
static bool CanHaveReference(IJsonValue value)
{
if (value is JsonArray)
{
return true;
}
if (value is JsonObject obj)
{
foreach (var nested in obj.Values)
{
if (CanHaveReference(nested))
{
return true;
}
}
}
return false;
}
return source.Values.NotNull().SelectMany(x => x.Values).Any(CanHaveReference);
}
public static HashSet<DomainId> GetReferencedIds(this ContentData source, Schema schema,
ResolvedComponents components, int referencesPerField = int.MaxValue)
{

25
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs

@ -94,6 +94,12 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
foreach (var (_, value, version) in snapshots)
{
// Some data is corrupt and might throw an exception during migration if we do not skip them.
if (value.AppId == null || value.CurrentVersion == null)
{
continue;
}
if (ShouldWritePublished(value))
{
entitiesPublished.Add(await CreatePublishedContentAsync(value, version));
@ -177,19 +183,20 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
entity.IndexedSchemaId = value.SchemaId.Id;
entity.Version = newVersion;
var schema = await appProvider.GetSchemaAsync(value.AppId.Id, value.SchemaId.Id, true);
if (schema != null)
if (data.CanHaveReference())
{
var components = await appProvider.GetComponentsAsync(schema);
var schema = await appProvider.GetSchemaAsync(value.AppId.Id, value.SchemaId.Id, true);
entity.ReferencedIds = entity.Data.GetReferencedIds(schema.SchemaDef, components);
}
else
{
entity.ReferencedIds = new HashSet<DomainId>();
if (schema != null)
{
var components = await appProvider.GetComponentsAsync(schema);
entity.ReferencedIds = entity.Data.GetReferencedIds(schema.SchemaDef, components);
}
}
entity.ReferencedIds ??= new HashSet<DomainId>();
return entity;
}

2
backend/src/Squidex.Domain.Apps.Entities/Comments/IWatchingService.cs

@ -5,8 +5,8 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Squidex.Infrastructure;
using System.Threading.Tasks;
using Squidex.Infrastructure;
namespace Squidex.Domain.Apps.Entities.Comments
{

121
backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs

@ -16,6 +16,7 @@ using Squidex.Caching;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.States;
using Squidex.Infrastructure.Tasks;
using Squidex.Log;
#pragma warning disable RECS0108 // Warns about static fields in generic types
@ -26,6 +27,7 @@ namespace Squidex.Infrastructure.Commands
private readonly ILocalCache localCache;
private readonly IEventStore eventStore;
private readonly IServiceProvider serviceProvider;
private readonly ISemanticLog log;
private static class Factory<T, TState> where T : DomainObject<TState> where TState : class, IDomainState<TState>, new()
{
@ -40,14 +42,23 @@ namespace Squidex.Infrastructure.Commands
public Rebuilder(
ILocalCache localCache,
IEventStore eventStore,
IServiceProvider serviceProvider)
IServiceProvider serviceProvider,
ISemanticLog log)
{
this.eventStore = eventStore;
this.serviceProvider = serviceProvider;
this.log = log;
this.localCache = localCache;
}
public virtual async Task RebuildAsync<T, TState>(string filter, int batchSize,
public virtual Task RebuildAsync<T, TState>(string filter, int batchSize,
CancellationToken ct = default)
where T : DomainObject<TState> where TState : class, IDomainState<TState>, new()
{
return RebuildAsync<T, TState>(filter, batchSize, 0, ct);
}
public virtual async Task RebuildAsync<T, TState>(string filter, int batchSize, double errorThreshold,
CancellationToken ct = default)
where T : DomainObject<TState> where TState : class, IDomainState<TState>, new()
{
@ -55,10 +66,17 @@ namespace Squidex.Infrastructure.Commands
var ids = eventStore.QueryAllAsync(filter, ct: ct).Select(x => x.Data.Headers.AggregateId());
await InsertManyAsync<T, TState>(ids, batchSize, ct);
await InsertManyAsync<T, TState>(ids, batchSize, errorThreshold, ct);
}
public virtual Task InsertManyAsync<T, TState>(IEnumerable<DomainId> source, int batchSize,
CancellationToken ct = default)
where T : DomainObject<TState> where TState : class, IDomainState<TState>, new()
{
return InsertManyAsync<T, TState>(source, batchSize, 0, ct);
}
public virtual async Task InsertManyAsync<T, TState>(IEnumerable<DomainId> source, int batchSize,
public virtual async Task InsertManyAsync<T, TState>(IEnumerable<DomainId> source, int batchSize, double errorThreshold = 0,
CancellationToken ct = default)
where T : DomainObject<TState> where TState : class, IDomainState<TState>, new()
{
@ -66,10 +84,10 @@ namespace Squidex.Infrastructure.Commands
var ids = source.ToAsyncEnumerable();
await InsertManyAsync<T, TState>(ids, batchSize, ct);
await InsertManyAsync<T, TState>(ids, batchSize, errorThreshold, ct);
}
private async Task InsertManyAsync<T, TState>(IAsyncEnumerable<DomainId> source, int batchSize,
private async Task InsertManyAsync<T, TState>(IAsyncEnumerable<DomainId> source, int batchSize, double errorThreshold,
CancellationToken ct = default)
where T : DomainObject<TState> where TState : class, IDomainState<TState>, new()
{
@ -77,55 +95,65 @@ namespace Squidex.Infrastructure.Commands
var parallelism = Environment.ProcessorCount;
var workerBlock = new ActionBlock<DomainId[]>(async ids =>
var handledIds = new HashSet<DomainId>();
var handlerErrors = 0;
using (localCache.StartContext())
{
try
var workerBlock = new ActionBlock<DomainId[]>(async ids =>
{
await using (var context = store.WithBatchContext(typeof(T)))
try
{
await context.LoadAsync(ids);
foreach (var id in ids)
await using (var context = store.WithBatchContext(typeof(T)))
{
try
{
var domainObject = Factory<T, TState>.Create(serviceProvider, context);
domainObject.Setup(id);
await context.LoadAsync(ids);
await domainObject.RebuildStateAsync();
}
catch (DomainObjectNotFoundException)
foreach (var id in ids)
{
return;
try
{
var domainObject = Factory<T, TState>.Create(serviceProvider, context);
domainObject.Setup(id);
await domainObject.RebuildStateAsync();
}
catch (DomainObjectNotFoundException)
{
return;
}
catch (Exception ex)
{
log.LogWarning(ex, w => w
.WriteProperty("reason", "CorruptData")
.WriteProperty("domainObjectId", id.ToString())
.WriteProperty("domainObjectType", typeof(T).Name));
Interlocked.Increment(ref handlerErrors);
}
}
}
}
}
catch (OperationCanceledException ex)
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
},
new ExecutionDataflowBlockOptions
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = parallelism,
MaxMessagesPerTask = 10,
BoundedCapacity = parallelism
});
var batchBlock = new BatchBlock<DomainId>(batchSize, new GroupingDataflowBlockOptions
{
BoundedCapacity = batchSize
});
MaxDegreeOfParallelism = parallelism,
MaxMessagesPerTask = 10,
BoundedCapacity = parallelism
});
batchBlock.BidirectionalLinkTo(workerBlock);
var batchBlock = new BatchBlock<DomainId>(batchSize, new GroupingDataflowBlockOptions
{
BoundedCapacity = batchSize
});
var handledIds = new HashSet<DomainId>();
batchBlock.BidirectionalLinkTo(workerBlock);
using (localCache.StartContext())
{
await foreach (var id in source.WithCancellation(ct))
{
if (handledIds.Add(id))
@ -138,9 +166,16 @@ namespace Squidex.Infrastructure.Commands
}
batchBlock.Complete();
await workerBlock.Completion;
}
await workerBlock.Completion;
var errorRate = (double)handlerErrors / handledIds.Count;
if (errorRate >= errorThreshold)
{
throw new InvalidOperationException($"Error rate of {errorRate} is above threshold {errorThreshold}.");
}
}
private async Task ClearAsync<TState>() where TState : class, IDomainState<TState>, new()

Loading…
Cancel
Save