From 7d6364325358ad19701110ba7e7d6979972ec4c3 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sat, 16 Dec 2017 19:13:45 +0100 Subject: [PATCH] Migrator fixed for multi server scenario. --- .../Migrations/MongoMigrationStatus.cs | 21 ++---- .../Migrations/Migrator.cs | 26 ++++---- .../Migrations/MigratorTests.cs | 66 +++++++++++++++++-- 3 files changed, 82 insertions(+), 31 deletions(-) diff --git a/src/Squidex.Infrastructure.MongoDb/Migrations/MongoMigrationStatus.cs b/src/Squidex.Infrastructure.MongoDb/Migrations/MongoMigrationStatus.cs index b9abb0515..7f96b8737 100644 --- a/src/Squidex.Infrastructure.MongoDb/Migrations/MongoMigrationStatus.cs +++ b/src/Squidex.Infrastructure.MongoDb/Migrations/MongoMigrationStatus.cs @@ -15,6 +15,7 @@ namespace Squidex.Infrastructure.Migrations public sealed class MongoMigrationStatus : MongoRepositoryBase, IMigrationStatus { private const string DefaultId = "Default"; + private static readonly FindOneAndUpdateOptions UpsertFind = new FindOneAndUpdateOptions { IsUpsert = true }; public MongoMigrationStatus(IMongoDatabase database) : base(database) @@ -30,27 +31,15 @@ namespace Squidex.Infrastructure.Migrations { var entity = await Collection.Find(x => x.Id == DefaultId).FirstOrDefaultAsync(); - if (entity == null) - { - try - { - await Collection.InsertOneAsync(new MongoMigrationEntity { Id = DefaultId }); - } - catch (MongoWriteException ex) - { - if (ex.WriteError.Category != ServerErrorCategory.DuplicateKey) - { - throw; - } - } - } - return entity?.Version ?? 0; } public async Task TryLockAsync() { - var entity = await Collection.FindOneAndUpdateAsync(x => x.Id == DefaultId, Update.Set(x => x.IsLocked, true)); + var entity = + await Collection.FindOneAndUpdateAsync(x => x.Id == DefaultId, + Update.Set(x => x.IsLocked, true), + UpsertFind); return entity?.IsLocked == false; } diff --git a/src/Squidex.Infrastructure/Migrations/Migrator.cs b/src/Squidex.Infrastructure/Migrations/Migrator.cs index 3ba7e29ce..127bc7fcd 100644 --- a/src/Squidex.Infrastructure/Migrations/Migrator.cs +++ b/src/Squidex.Infrastructure/Migrations/Migrator.cs @@ -20,6 +20,8 @@ namespace Squidex.Infrastructure.Migrations private readonly IEnumerable migrations; private readonly ISemanticLog log; + public int LockWaitMs { get; set; } = 5000; + public Migrator(IMigrationStatus migrationStatus, IEnumerable migrations, ISemanticLog log) { Guard.NotNull(migrationStatus, nameof(migrationStatus)); @@ -34,22 +36,24 @@ namespace Squidex.Infrastructure.Migrations public async Task MigrateAsync() { - var version = await migrationStatus.GetVersionAsync(); - - var lastMigrator = migrations.FirstOrDefault(); + var version = 0; - if (lastMigrator != null && lastMigrator.ToVersion != version) + try { while (!await migrationStatus.TryLockAsync()) { log.LogInformation(w => w .WriteProperty("action", "Migrate") - .WriteProperty("mesage", "Waiting 5sec to acquire lock.")); + .WriteProperty("mesage", $"Waiting {LockWaitMs}ms to acquire lock.")); - await Task.Delay(5000); + await Task.Delay(LockWaitMs); } - try + var lastMigrator = migrations.FirstOrDefault(); + + version = await migrationStatus.GetVersionAsync(); + + if (lastMigrator != null && lastMigrator.ToVersion != version) { var migrationPath = FindMigratorPath(version, lastMigrator.ToVersion).ToList(); @@ -73,10 +77,10 @@ namespace Squidex.Infrastructure.Migrations } } } - finally - { - await migrationStatus.UnlockAsync(version); - } + } + finally + { + await migrationStatus.UnlockAsync(version); } } diff --git a/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs b/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs index adf71d3a5..c70e10290 100644 --- a/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs @@ -7,9 +7,11 @@ // ========================================================================== using System; +using System.Linq; using System.Threading.Tasks; using FakeItEasy; using Squidex.Infrastructure.Log; +using Squidex.Infrastructure.Tasks; using Xunit; namespace Squidex.Infrastructure.Migrations @@ -17,6 +19,48 @@ namespace Squidex.Infrastructure.Migrations public sealed class MigratorTests { private readonly IMigrationStatus status = A.Fake(); + private readonly ISemanticLog log = A.Fake(); + + public sealed class InMemoryStatus : IMigrationStatus + { + private readonly object lockObject = new object(); + private int version; + private bool isLocked; + + public Task GetVersionAsync() + { + return Task.FromResult(version); + } + + public Task TryLockAsync() + { + var lockAcquired = false; + + lock (lockObject) + { + if (!isLocked) + { + isLocked = true; + + lockAcquired = true; + } + } + + return Task.FromResult(lockAcquired); + } + + public Task UnlockAsync(int newVersion) + { + lock (lockObject) + { + isLocked = false; + + version = newVersion; + } + + return TaskHelper.Done; + } + } public MigratorTests() { @@ -31,7 +75,7 @@ namespace Squidex.Infrastructure.Migrations var migrator_1_2 = BuildMigration(1, 2); var migrator_2_3 = BuildMigration(2, 3); - var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, A.Fake()); + var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, log); await migrator.MigrateAsync(); @@ -49,7 +93,7 @@ namespace Squidex.Infrastructure.Migrations var migrator_1_2 = BuildMigration(1, 2); var migrator_2_3 = BuildMigration(2, 3); - var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, A.Fake()); + var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, log); A.CallTo(() => migrator_1_2.UpdateAsync()).Throws(new ArgumentException()); @@ -70,7 +114,7 @@ namespace Squidex.Infrastructure.Migrations var migrator_1_2 = BuildMigration(1, 2); var migrator_2_3 = BuildMigration(2, 3); - var migrator = new Migrator(status, new[] { migrator_0_1, migrator_0_2, migrator_1_2, migrator_2_3 }, A.Fake()); + var migrator = new Migrator(status, new[] { migrator_0_1, migrator_0_2, migrator_1_2, migrator_2_3 }, log); await migrator.MigrateAsync(); @@ -88,7 +132,7 @@ namespace Squidex.Infrastructure.Migrations var migrator_0_1 = BuildMigration(0, 1); var migrator_2_3 = BuildMigration(2, 3); - var migrator = new Migrator(status, new[] { migrator_0_1, migrator_2_3 }, A.Fake()); + var migrator = new Migrator(status, new[] { migrator_0_1, migrator_2_3 }, log); await Assert.ThrowsAsync(migrator.MigrateAsync); @@ -98,6 +142,20 @@ namespace Squidex.Infrastructure.Migrations A.CallTo(() => status.UnlockAsync(0)).MustHaveHappened(); } + [Fact] + public async Task Should_prevent_multiple_updates() + { + var migrator_0_1 = BuildMigration(0, 1); + var migrator_1_2 = BuildMigration(1, 2); + + var migrator = new Migrator(new InMemoryStatus(), new[] { migrator_0_1, migrator_1_2 }, log) { LockWaitMs = 2 }; + + await Task.WhenAll(Enumerable.Repeat(0, 10).Select(x => Task.Run(migrator.MigrateAsync))); + + A.CallTo(() => migrator_0_1.UpdateAsync()).MustHaveHappened(Repeated.Exactly.Once); + A.CallTo(() => migrator_1_2.UpdateAsync()).MustHaveHappened(Repeated.Exactly.Once); + } + private IMigration BuildMigration(int fromVersion, int toVersion) { var migration = A.Fake();