Browse Source

Migrator fixed for multi server scenario.

pull/208/head
Sebastian Stehle 8 years ago
parent
commit
7d63643253
  1. 21
      src/Squidex.Infrastructure.MongoDb/Migrations/MongoMigrationStatus.cs
  2. 20
      src/Squidex.Infrastructure/Migrations/Migrator.cs
  3. 66
      tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs

21
src/Squidex.Infrastructure.MongoDb/Migrations/MongoMigrationStatus.cs

@ -15,6 +15,7 @@ namespace Squidex.Infrastructure.Migrations
public sealed class MongoMigrationStatus : MongoRepositoryBase<MongoMigrationEntity>, IMigrationStatus
{
private const string DefaultId = "Default";
private static readonly FindOneAndUpdateOptions<MongoMigrationEntity> UpsertFind = new FindOneAndUpdateOptions<MongoMigrationEntity> { IsUpsert = true };
public MongoMigrationStatus(IMongoDatabase database)
: base(database)
@ -30,27 +31,15 @@ namespace Squidex.Infrastructure.Migrations
{
var entity = await Collection.Find(x => x.Id == DefaultId).FirstOrDefaultAsync();
if (entity == null)
{
try
{
await Collection.InsertOneAsync(new MongoMigrationEntity { Id = DefaultId });
}
catch (MongoWriteException ex)
{
if (ex.WriteError.Category != ServerErrorCategory.DuplicateKey)
{
throw;
}
}
}
return entity?.Version ?? 0;
}
public async Task<bool> TryLockAsync()
{
var entity = await Collection.FindOneAndUpdateAsync(x => x.Id == DefaultId, Update.Set(x => x.IsLocked, true));
var entity =
await Collection.FindOneAndUpdateAsync<MongoMigrationEntity>(x => x.Id == DefaultId,
Update.Set(x => x.IsLocked, true),
UpsertFind);
return entity?.IsLocked == false;
}

20
src/Squidex.Infrastructure/Migrations/Migrator.cs

@ -20,6 +20,8 @@ namespace Squidex.Infrastructure.Migrations
private readonly IEnumerable<IMigration> migrations;
private readonly ISemanticLog log;
public int LockWaitMs { get; set; } = 5000;
public Migrator(IMigrationStatus migrationStatus, IEnumerable<IMigration> migrations, ISemanticLog log)
{
Guard.NotNull(migrationStatus, nameof(migrationStatus));
@ -34,22 +36,24 @@ namespace Squidex.Infrastructure.Migrations
public async Task MigrateAsync()
{
var version = await migrationStatus.GetVersionAsync();
var lastMigrator = migrations.FirstOrDefault();
var version = 0;
if (lastMigrator != null && lastMigrator.ToVersion != version)
try
{
while (!await migrationStatus.TryLockAsync())
{
log.LogInformation(w => w
.WriteProperty("action", "Migrate")
.WriteProperty("mesage", "Waiting 5sec to acquire lock."));
.WriteProperty("mesage", $"Waiting {LockWaitMs}ms to acquire lock."));
await Task.Delay(5000);
await Task.Delay(LockWaitMs);
}
try
var lastMigrator = migrations.FirstOrDefault();
version = await migrationStatus.GetVersionAsync();
if (lastMigrator != null && lastMigrator.ToVersion != version)
{
var migrationPath = FindMigratorPath(version, lastMigrator.ToVersion).ToList();
@ -73,12 +77,12 @@ namespace Squidex.Infrastructure.Migrations
}
}
}
}
finally
{
await migrationStatus.UnlockAsync(version);
}
}
}
private IEnumerable<IMigration> FindMigratorPath(int fromVersion, int toVersion)
{

66
tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs

@ -7,9 +7,11 @@
// ==========================================================================
using System;
using System.Linq;
using System.Threading.Tasks;
using FakeItEasy;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Tasks;
using Xunit;
namespace Squidex.Infrastructure.Migrations
@ -17,6 +19,48 @@ namespace Squidex.Infrastructure.Migrations
public sealed class MigratorTests
{
private readonly IMigrationStatus status = A.Fake<IMigrationStatus>();
private readonly ISemanticLog log = A.Fake<ISemanticLog>();
public sealed class InMemoryStatus : IMigrationStatus
{
private readonly object lockObject = new object();
private int version;
private bool isLocked;
public Task<int> GetVersionAsync()
{
return Task.FromResult(version);
}
public Task<bool> TryLockAsync()
{
var lockAcquired = false;
lock (lockObject)
{
if (!isLocked)
{
isLocked = true;
lockAcquired = true;
}
}
return Task.FromResult(lockAcquired);
}
public Task UnlockAsync(int newVersion)
{
lock (lockObject)
{
isLocked = false;
version = newVersion;
}
return TaskHelper.Done;
}
}
public MigratorTests()
{
@ -31,7 +75,7 @@ namespace Squidex.Infrastructure.Migrations
var migrator_1_2 = BuildMigration(1, 2);
var migrator_2_3 = BuildMigration(2, 3);
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, A.Fake<ISemanticLog>());
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, log);
await migrator.MigrateAsync();
@ -49,7 +93,7 @@ namespace Squidex.Infrastructure.Migrations
var migrator_1_2 = BuildMigration(1, 2);
var migrator_2_3 = BuildMigration(2, 3);
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, A.Fake<ISemanticLog>());
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, log);
A.CallTo(() => migrator_1_2.UpdateAsync()).Throws(new ArgumentException());
@ -70,7 +114,7 @@ namespace Squidex.Infrastructure.Migrations
var migrator_1_2 = BuildMigration(1, 2);
var migrator_2_3 = BuildMigration(2, 3);
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_0_2, migrator_1_2, migrator_2_3 }, A.Fake<ISemanticLog>());
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_0_2, migrator_1_2, migrator_2_3 }, log);
await migrator.MigrateAsync();
@ -88,7 +132,7 @@ namespace Squidex.Infrastructure.Migrations
var migrator_0_1 = BuildMigration(0, 1);
var migrator_2_3 = BuildMigration(2, 3);
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_2_3 }, A.Fake<ISemanticLog>());
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_2_3 }, log);
await Assert.ThrowsAsync<InvalidOperationException>(migrator.MigrateAsync);
@ -98,6 +142,20 @@ namespace Squidex.Infrastructure.Migrations
A.CallTo(() => status.UnlockAsync(0)).MustHaveHappened();
}
[Fact]
public async Task Should_prevent_multiple_updates()
{
var migrator_0_1 = BuildMigration(0, 1);
var migrator_1_2 = BuildMigration(1, 2);
var migrator = new Migrator(new InMemoryStatus(), new[] { migrator_0_1, migrator_1_2 }, log) { LockWaitMs = 2 };
await Task.WhenAll(Enumerable.Repeat(0, 10).Select(x => Task.Run(migrator.MigrateAsync)));
A.CallTo(() => migrator_0_1.UpdateAsync()).MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => migrator_1_2.UpdateAsync()).MustHaveHappened(Repeated.Exactly.Once);
}
private IMigration BuildMigration(int fromVersion, int toVersion)
{
var migration = A.Fake<IMigration>();

Loading…
Cancel
Save