Browse Source

New migrator.

pull/221/head
Sebastian Stehle 8 years ago
parent
commit
7c86131396
  1. 8
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs
  2. 6
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs
  3. 3
      src/Squidex.Infrastructure/Migrations/IMigration.cs
  4. 12
      src/Squidex.Infrastructure/Migrations/Migrator.cs
  5. 2
      src/Squidex.Infrastructure/States/ISnapshotStore.cs
  6. 6
      src/Squidex/Config/Domain/WriteServices.cs
  7. 31
      tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs
  8. 147
      tools/Migrate_01/Migration01_FromCqrs.cs
  9. 3
      tools/Migrate_01/Migration02_AddPatterns.cs
  10. 36
      tools/Migrate_01/Migration03_SplitContentCollections.cs
  11. 157
      tools/Migrate_01/Rebuilder.cs

8
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs

@ -10,6 +10,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.OData.UriParser;
using MongoDB.Bson;
using MongoDB.Driver;
using Squidex.Domain.Apps.Core.Contents;
using Squidex.Domain.Apps.Entities.Apps;
@ -144,5 +145,12 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
return contentEntity;
}
public override async Task ClearAsync()
{
await ArchiveCollection.DeleteManyAsync(new BsonDocument());
await base.ClearAsync();
}
}
}

6
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs

@ -10,7 +10,6 @@ using Squidex.Domain.Apps.Events.Assets;
using Squidex.Domain.Apps.Events.Contents;
using Squidex.Infrastructure.Dispatching;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
{
@ -26,11 +25,6 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
get { return "^(content-)|(asset-)"; }
}
public override Task ClearAsync()
{
return TaskHelper.Done;
}
public Task On(Envelope<IEvent> @event)
{
return this.DispatchActionAsync(@event.Payload);

3
src/Squidex.Infrastructure/Migrations/IMigration.cs

@ -5,6 +5,7 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.Migrations
@ -15,6 +16,6 @@ namespace Squidex.Infrastructure.Migrations
int ToVersion { get; }
Task UpdateAsync();
Task UpdateAsync(IEnumerable<IMigration> previousMigrations);
}
}

12
src/Squidex.Infrastructure/Migrations/Migrator.cs

@ -56,9 +56,11 @@ namespace Squidex.Infrastructure.Migrations
{
var migrationPath = FindMigratorPath(version, lastMigrator.ToVersion).ToList();
foreach (var migrator in migrationPath)
var previousMigrations = new List<IMigration>();
foreach (var migration in migrationPath)
{
var name = migrator.GetType().ToString();
var name = migration.GetType().ToString();
log.LogInformation(w => w
.WriteProperty("action", "Migration")
@ -70,10 +72,12 @@ namespace Squidex.Infrastructure.Migrations
.WriteProperty("status", "Completed")
.WriteProperty("migrator", name)))
{
await migrator.UpdateAsync();
await migration.UpdateAsync(previousMigrations.ToList());
version = migrator.ToVersion;
version = migration.ToVersion;
}
previousMigrations.Add(migration);
}
}
}

2
src/Squidex.Infrastructure/States/ISnapshotStore.cs

@ -14,5 +14,7 @@ namespace Squidex.Infrastructure.States
Task WriteAsync(TKey key, T value, long oldVersion, long newVersion);
Task<(T Value, long Version)> ReadAsync(TKey key);
Task ClearAsync();
}
}

6
src/Squidex/Config/Domain/WriteServices.cs

@ -69,6 +69,12 @@ namespace Squidex.Config.Domain
services.AddTransientAs<Migration02_AddPatterns>()
.As<IMigration>();
services.AddTransientAs<Migration03_SplitContentCollections>()
.As<IMigration>();
services.AddTransientAs<Rebuilder>()
.AsSelf();
services.AddTransientAs<AppDomainObject>()
.AsSelf();

31
tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs

@ -6,6 +6,7 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FakeItEasy;
@ -78,9 +79,9 @@ namespace Squidex.Infrastructure.Migrations
await migrator.MigrateAsync();
A.CallTo(() => migrator_0_1.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_0_1.UpdateAsync(A<IEnumerable<IMigration>>.That.IsEmpty())).MustHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync(A<IEnumerable<IMigration>>.That.IsSameSequenceAs(migrator_0_1))).MustHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync(A<IEnumerable<IMigration>>.That.IsSameSequenceAs(migrator_0_1, migrator_1_2))).MustHaveHappened();
A.CallTo(() => status.UnlockAsync(3)).MustHaveHappened();
}
@ -94,13 +95,13 @@ namespace Squidex.Infrastructure.Migrations
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, log);
A.CallTo(() => migrator_1_2.UpdateAsync()).Throws(new ArgumentException());
A.CallTo(() => migrator_1_2.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).Throws(new ArgumentException());
await Assert.ThrowsAsync<ArgumentException>(migrator.MigrateAsync);
A.CallTo(() => migrator_0_1.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync()).MustNotHaveHappened();
A.CallTo(() => migrator_0_1.UpdateAsync(A<IEnumerable<IMigration>>.That.IsEmpty())).MustHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync(A<IEnumerable<IMigration>>.That.IsSameSequenceAs(migrator_0_1))).MustHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustNotHaveHappened();
A.CallTo(() => status.UnlockAsync(1)).MustHaveHappened();
}
@ -117,10 +118,10 @@ namespace Squidex.Infrastructure.Migrations
await migrator.MigrateAsync();
A.CallTo(() => migrator_0_2.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_0_1.UpdateAsync()).MustNotHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync()).MustNotHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_0_2.UpdateAsync(A<IEnumerable<IMigration>>.That.IsEmpty())).MustHaveHappened();
A.CallTo(() => migrator_0_1.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustNotHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustNotHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync(A<IEnumerable<IMigration>>.That.IsSameSequenceAs(migrator_0_2))).MustHaveHappened();
A.CallTo(() => status.UnlockAsync(3)).MustHaveHappened();
}
@ -135,8 +136,8 @@ namespace Squidex.Infrastructure.Migrations
await Assert.ThrowsAsync<InvalidOperationException>(migrator.MigrateAsync);
A.CallTo(() => migrator_0_1.UpdateAsync()).MustNotHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync()).MustNotHaveHappened();
A.CallTo(() => migrator_0_1.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustNotHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustNotHaveHappened();
A.CallTo(() => status.UnlockAsync(0)).MustHaveHappened();
}
@ -151,8 +152,8 @@ namespace Squidex.Infrastructure.Migrations
await Task.WhenAll(Enumerable.Repeat(0, 10).Select(x => Task.Run(migrator.MigrateAsync)));
A.CallTo(() => migrator_0_1.UpdateAsync()).MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => migrator_1_2.UpdateAsync()).MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => migrator_0_1.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => migrator_1_2.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustHaveHappened(Repeated.Exactly.Once);
}
private IMigration BuildMigration(int fromVersion, int toVersion)

147
tools/Migrate_01/Migration01_FromCqrs.cs

@ -5,163 +5,30 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.Schemas;
using Squidex.Domain.Apps.Entities.Apps;
using Squidex.Domain.Apps.Entities.Assets;
using Squidex.Domain.Apps.Entities.Contents.State;
using Squidex.Domain.Apps.Entities.Rules;
using Squidex.Domain.Apps.Entities.Schemas;
using Squidex.Domain.Apps.Events;
using Squidex.Domain.Apps.Events.Assets;
using Squidex.Domain.Apps.Events.Contents;
using Squidex.Domain.Apps.Events.Rules;
using Squidex.Infrastructure;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Migrations;
using Squidex.Infrastructure.States;
namespace Migrate_01
{
public sealed class Migration01_FromCqrs : IMigration
{
private readonly FieldRegistry fieldRegistry;
private readonly IEventStore eventStore;
private readonly IEventDataFormatter eventDataFormatter;
private readonly ISnapshotStore<ContentState, Guid> snapshotStore;
private readonly IStateFactory stateFactory;
private readonly Rebuilder rebuilder;
public int FromVersion { get; } = 0;
public int ToVersion { get; } = 1;
public Migration01_FromCqrs(
FieldRegistry fieldRegistry,
IEventDataFormatter eventDataFormatter,
IEventStore eventStore,
ISnapshotStore<ContentState, Guid> snapshotStore,
IStateFactory stateFactory)
public Migration01_FromCqrs(Rebuilder rebuilder)
{
this.fieldRegistry = fieldRegistry;
this.eventDataFormatter = eventDataFormatter;
this.eventStore = eventStore;
this.snapshotStore = snapshotStore;
this.stateFactory = stateFactory;
this.rebuilder = rebuilder;
}
public async Task UpdateAsync()
public async Task UpdateAsync(IEnumerable<IMigration> previousMigrations)
{
await MigrateConfigAsync();
await MigrateContentAsync();
await MigrateAssetsAsync();
}
private Task MigrateAssetsAsync()
{
const string filter = "^asset\\-";
var handledIds = new HashSet<Guid>();
return eventStore.GetEventsAsync(async storedEvent =>
{
var @event = ParseKnownEvent(storedEvent);
if (@event != null)
{
if (@event.Payload is AssetEvent assetEvent && handledIds.Add(assetEvent.AssetId))
{
var asset = await stateFactory.CreateAsync<AssetDomainObject>(assetEvent.AssetId);
asset.ApplySnapshot(asset.Snapshot.Apply(@event));
await asset.WriteSnapshotAsync();
}
}
}, CancellationToken.None, filter);
}
private Task MigrateConfigAsync()
{
const string filter = "^((app\\-)|(schema\\-)|(rule\\-))";
var handledIds = new HashSet<Guid>();
return eventStore.GetEventsAsync(async storedEvent =>
{
var @event = ParseKnownEvent(storedEvent);
if (@event != null)
{
if (@event.Payload is SchemaEvent schemaEvent && handledIds.Add(schemaEvent.SchemaId.Id))
{
var schema = await stateFactory.GetSingleAsync<SchemaDomainObject>(schemaEvent.SchemaId.Id);
await schema.WriteSnapshotAsync();
}
else if (@event.Payload is RuleEvent ruleEvent && handledIds.Add(ruleEvent.RuleId))
{
var rule = await stateFactory.GetSingleAsync<RuleDomainObject>(ruleEvent.RuleId);
await rule.WriteSnapshotAsync();
}
else if (@event.Payload is AppEvent appEvent && handledIds.Add(appEvent.AppId.Id))
{
var app = await stateFactory.GetSingleAsync<AppDomainObject>(appEvent.AppId.Id);
await app.WriteSnapshotAsync();
}
}
}, CancellationToken.None, filter);
}
private Task MigrateContentAsync()
{
const string filter = "^((content\\-))";
var handledIds = new HashSet<Guid>();
return eventStore.GetEventsAsync(async storedEvent =>
{
var @event = ParseKnownEvent(storedEvent);
if (@event.Payload is ContentEvent contentEvent)
{
try
{
var (content, version) = await snapshotStore.ReadAsync(contentEvent.ContentId);
if (content == null)
{
version = EtagVersion.Empty;
content = new ContentState();
}
content = content.Apply(@event);
await snapshotStore.WriteAsync(contentEvent.ContentId, content, version, version + 1);
}
catch (DomainObjectNotFoundException)
{
// Schema has been deleted.
}
}
}, CancellationToken.None, filter);
}
private Envelope<IEvent> ParseKnownEvent(StoredEvent storedEvent)
{
try
{
return eventDataFormatter.Parse(storedEvent.Data);
}
catch (TypeNameNotFoundException)
{
return null;
}
await rebuilder.RebuildConfigAsync();
await rebuilder.RebuildContentAsync();
await rebuilder.RebuildAssetsAsync();
}
}
}

3
tools/Migrate_01/Migration02_AddPatterns.cs

@ -6,6 +6,7 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Entities.Apps;
using Squidex.Domain.Apps.Entities.Apps.Commands;
@ -33,7 +34,7 @@ namespace Migrate_01
this.stateFactory = stateFactory;
}
public async Task UpdateAsync()
public async Task UpdateAsync(IEnumerable<IMigration> previousMigrations)
{
var ids = await appRepository.QueryAppIdsAsync();

36
tools/Migrate_01/Migration03_SplitContentCollections.cs

@ -0,0 +1,36 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Squidex.Infrastructure.Migrations;
namespace Migrate_01
{
public class Migration03_SplitContentCollections : IMigration
{
private readonly Rebuilder rebuilder;
public int FromVersion { get; } = 2;
public int ToVersion { get; } = 3;
public Migration03_SplitContentCollections(Rebuilder rebuilder)
{
this.rebuilder = rebuilder;
}
public async Task UpdateAsync(IEnumerable<IMigration> previousMigrations)
{
if (!previousMigrations.Any(x => x is Migration01_FromCqrs))
{
await rebuilder.RebuildContentAsync();
}
}
}
}

157
tools/Migrate_01/Rebuilder.cs

@ -0,0 +1,157 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.Schemas;
using Squidex.Domain.Apps.Entities.Apps;
using Squidex.Domain.Apps.Entities.Assets;
using Squidex.Domain.Apps.Entities.Contents.State;
using Squidex.Domain.Apps.Entities.Rules;
using Squidex.Domain.Apps.Entities.Schemas;
using Squidex.Domain.Apps.Events;
using Squidex.Domain.Apps.Events.Assets;
using Squidex.Domain.Apps.Events.Contents;
using Squidex.Domain.Apps.Events.Rules;
using Squidex.Infrastructure;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.States;
namespace Migrate_01
{
public sealed class Rebuilder
{
private readonly FieldRegistry fieldRegistry;
private readonly IEventStore eventStore;
private readonly IEventDataFormatter eventDataFormatter;
private readonly ISnapshotStore<ContentState, Guid> snapshotContentStore;
private readonly IStateFactory stateFactory;
public Rebuilder(
FieldRegistry fieldRegistry,
IEventDataFormatter eventDataFormatter,
IEventStore eventStore,
ISnapshotStore<ContentState, Guid> snapshotContentStore,
IStateFactory stateFactory)
{
this.fieldRegistry = fieldRegistry;
this.eventDataFormatter = eventDataFormatter;
this.eventStore = eventStore;
this.snapshotContentStore = snapshotContentStore;
this.stateFactory = stateFactory;
}
public Task RebuildAssetsAsync()
{
const string filter = "^asset\\-";
var handledIds = new HashSet<Guid>();
return eventStore.GetEventsAsync(async storedEvent =>
{
var @event = ParseKnownEvent(storedEvent);
if (@event != null)
{
if (@event.Payload is AssetEvent assetEvent && handledIds.Add(assetEvent.AssetId))
{
var asset = await stateFactory.CreateAsync<AssetDomainObject>(assetEvent.AssetId);
asset.ApplySnapshot(asset.Snapshot.Apply(@event));
await asset.WriteSnapshotAsync();
}
}
}, CancellationToken.None, filter);
}
public Task RebuildConfigAsync()
{
const string filter = "^((app\\-)|(schema\\-)|(rule\\-))";
var handledIds = new HashSet<Guid>();
return eventStore.GetEventsAsync(async storedEvent =>
{
var @event = ParseKnownEvent(storedEvent);
if (@event != null)
{
if (@event.Payload is SchemaEvent schemaEvent && handledIds.Add(schemaEvent.SchemaId.Id))
{
var schema = await stateFactory.GetSingleAsync<SchemaDomainObject>(schemaEvent.SchemaId.Id);
await schema.WriteSnapshotAsync();
}
else if (@event.Payload is RuleEvent ruleEvent && handledIds.Add(ruleEvent.RuleId))
{
var rule = await stateFactory.GetSingleAsync<RuleDomainObject>(ruleEvent.RuleId);
await rule.WriteSnapshotAsync();
}
else if (@event.Payload is AppEvent appEvent && handledIds.Add(appEvent.AppId.Id))
{
var app = await stateFactory.GetSingleAsync<AppDomainObject>(appEvent.AppId.Id);
await app.WriteSnapshotAsync();
}
}
}, CancellationToken.None, filter);
}
public async Task RebuildContentAsync()
{
const string filter = "^((content\\-))";
var handledIds = new HashSet<Guid>();
await snapshotContentStore.ClearAsync();
await eventStore.GetEventsAsync(async storedEvent =>
{
var @event = ParseKnownEvent(storedEvent);
if (@event.Payload is ContentEvent contentEvent)
{
try
{
var (content, version) = await snapshotContentStore.ReadAsync(contentEvent.ContentId);
if (content == null)
{
version = EtagVersion.Empty;
content = new ContentState();
}
content = content.Apply(@event);
await snapshotContentStore.WriteAsync(contentEvent.ContentId, content, version, version + 1);
}
catch (DomainObjectNotFoundException)
{
// Schema has been deleted.
}
}
}, CancellationToken.None, filter);
}
private Envelope<IEvent> ParseKnownEvent(StoredEvent storedEvent)
{
try
{
return eventDataFormatter.Parse(storedEvent.Data);
}
catch (TypeNameNotFoundException)
{
return null;
}
}
}
}
Loading…
Cancel
Save