From 8f07bc47094d264d71c890b05bac6e6fac3148d7 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Tue, 23 Mar 2021 19:53:40 +0100 Subject: [PATCH] Fix/backup (#678) * Fix backup and improve performance. * Refactoring * Code cleanup. --- .../Backup/RestoreContext.cs | 31 +++++++++ .../Backup/RestoreGrain.cs | 67 +++++++++++++------ .../Contents/BackupContents.cs | 2 +- .../EventSourcing/MongoEventStore_Writer.cs | 24 ++++++- .../MongoDb/MongoRepositoryBase.cs | 1 + .../Commands/DomainObject.Execute.cs | 26 +++---- .../EventSourcing/EventCommit.cs | 31 +++++++++ .../EventSourcing/IEventStore.cs | 8 +++ .../EventSourcing/EventStoreTests.cs | 29 ++++++++ 9 files changed, 183 insertions(+), 36 deletions(-) create mode 100644 backend/src/Squidex.Infrastructure/EventSourcing/EventCommit.cs diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreContext.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreContext.cs index b0746c6de..787af4593 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreContext.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreContext.cs @@ -5,12 +5,17 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System; +using System.Collections.Generic; using Squidex.Infrastructure; namespace Squidex.Domain.Apps.Entities.Backup { public sealed class RestoreContext : BackupContextBase { + private readonly Dictionary streams = new Dictionary(1000); + private string? appStream; + public IBackupReader Reader { get; } public DomainId PreviousAppId { get; set; } @@ -24,5 +29,31 @@ namespace Squidex.Domain.Apps.Entities.Backup PreviousAppId = previousAppId; } + + public string GetStreamName(string streamName) + { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + + if (streamName.StartsWith("app-", StringComparison.OrdinalIgnoreCase)) + { + return appStream ??= $"app-{AppId}"; + } + + return streamName.Replace(PreviousAppId.ToString(), AppId.ToString()); + } + + public long GetStreamOffset(string streamName) + { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + + if (!streams.TryGetValue(streamName, out var offset)) + { + offset = EtagVersion.Empty; + } + + streams[streamName] = offset + 1; + + return offset; + } } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs index bed3f1772..5a54f96f0 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs @@ -31,6 +31,7 @@ namespace Squidex.Domain.Apps.Entities.Backup { public sealed class RestoreGrain : GrainOfString, IRestoreGrain { + private const int BatchSize = 500; private readonly IBackupArchiveLocation backupArchiveLocation; private readonly IClock clock; private readonly ICommandBus commandBus; @@ -141,7 +142,10 @@ namespace Squidex.Domain.Apps.Entities.Backup { var handlers = CreateHandlers(); - var logContext = (jobId: CurrentJob.Id.ToString(), jobUrl: CurrentJob.Url.ToString()); + var logContext = ( + jobId: CurrentJob.Id.ToString(), + jobUrl: CurrentJob.Url.ToString() + ); using (Profiler.StartSession()) { @@ -312,15 +316,51 @@ namespace Squidex.Domain.Apps.Entities.Backup private async Task ReadEventsAsync(IBackupReader reader, IEnumerable handlers) { + var batch = new List<(string, Envelope)>(BatchSize); + await reader.ReadEventsAsync(streamNameResolver, eventDataFormatter, async storedEvent => { - await HandleEventAsync(reader, handlers, storedEvent.Stream, storedEvent.Event); + batch.Add(storedEvent); + + if (batch.Count == BatchSize) + { + await CommitBatchAsync(reader, handlers, batch); + + batch.Clear(); + } }); + if (batch.Count > 0) + { + await CommitBatchAsync(reader, handlers, batch); + } + Log($"Reading {reader.ReadEvents} events and {reader.ReadAttachments} attachments completed.", true); } - private async Task HandleEventAsync(IBackupReader reader, IEnumerable handlers, string stream, Envelope @event) + private async Task CommitBatchAsync(IBackupReader reader, IEnumerable handlers, List<(string, Envelope)> batch) + { + var commits = new List(batch.Count); + + foreach (var (stream, @event) in batch) + { + var handled = await HandleEventAsync(reader, handlers, stream, @event); + + if (handled) + { + var streamName = restoreContext.GetStreamName(stream); + var streamOffset = restoreContext.GetStreamOffset(streamName); + + commits.Add(EventCommit.Create(streamName, streamOffset, @event, eventDataFormatter)); + } + } + + await eventStore.AppendUnsafeAsync(commits); + + Log($"Read {reader.ReadEvents} events and {reader.ReadAttachments} attachments.", true); + } + + private async Task HandleEventAsync(IBackupReader reader, IEnumerable handlers, string stream, Envelope @event) { if (@event.Payload is AppCreated appCreated) { @@ -340,10 +380,6 @@ namespace Squidex.Domain.Apps.Entities.Backup await CreateContextAsync(reader, previousAppId); } - stream = stream.Replace( - restoreContext.PreviousAppId.ToString(), - restoreContext.AppId.ToString()); - if (@event.Payload is SquidexEvent squidexEvent && squidexEvent.Actor != null) { if (restoreContext.UserMapping.TryMap(squidexEvent.Actor, out var newUser)) @@ -357,9 +393,9 @@ namespace Squidex.Domain.Apps.Entities.Backup appEvent.AppId = CurrentJob.AppId; } - if (@event.Headers.TryGet(CommonHeaders.AggregateId, out var aggregateId) && aggregateId is JsonString s) + if (@event.Headers.TryGet(CommonHeaders.AggregateId, out var value) && value is JsonString idString) { - var id = s.Value.Replace( + var id = idString.Value.Replace( restoreContext.PreviousAppId.ToString(), restoreContext.AppId.ToString()); @@ -372,20 +408,11 @@ namespace Squidex.Domain.Apps.Entities.Backup { if (!await handler.RestoreEventAsync(@event, restoreContext)) { - return; + return false; } } - var eventData = eventDataFormatter.ToEventData(@event, @event.Headers.CommitId()); - - var eventCommit = new List - { - eventData - }; - - await eventStore.AppendAsync(Guid.NewGuid(), stream, eventCommit); - - Log($"Read {reader.ReadEvents} events and {reader.ReadAttachments} attachments.", true); + return true; } private async Task CreateContextAsync(IBackupReader reader, DomainId previousAppId) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/BackupContents.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/BackupContents.cs index c1ffd7329..8d04a2742 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/BackupContents.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/BackupContents.cs @@ -204,7 +204,7 @@ namespace Squidex.Domain.Apps.Entities.Contents break; case JsonObject obj: - ReplaceAssetUrl(obj, FieldSetter); + ReplaceAssetUrl(obj, JsonSetter); break; } } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs index 33faa6c56..600d700d8 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs @@ -37,8 +37,6 @@ namespace Squidex.Infrastructure.EventSourcing Guard.NotEmpty(commitId, nameof(commitId)); Guard.NotNullOrEmpty(streamName, nameof(streamName)); Guard.NotNull(events, nameof(events)); - Guard.NotNullOrEmpty(streamName, nameof(streamName)); - Guard.NotNull(events, nameof(events)); Guard.LessThan(events.Count, MaxCommitSize, "events.Count"); Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); @@ -100,6 +98,28 @@ namespace Squidex.Infrastructure.EventSourcing } } + public async Task AppendUnsafeAsync(IEnumerable commits) + { + Guard.NotNull(commits, nameof(commits)); + + using (Profiler.TraceMethod()) + { + var writes = new List>(); + + foreach (var commit in commits) + { + var document = BuildCommit(commit.Id, commit.StreamName, commit.Offset, commit.Events); + + writes.Add(new InsertOneModel(document)); + } + + if (writes.Count > 0) + { + await Collection.BulkWriteAsync(writes, Unordered); + } + } + } + private async Task GetEventStreamOffsetAsync(string streamName) { var document = diff --git a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs index 691b8192a..95bee75f9 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs @@ -21,6 +21,7 @@ namespace Squidex.Infrastructure.MongoDb { private const string CollectionFormat = "{0}Set"; + protected static readonly BulkWriteOptions Unordered = new BulkWriteOptions { IsOrdered = true }; protected static readonly UpdateOptions Upsert = new UpdateOptions { IsUpsert = true }; protected static readonly ReplaceOptions UpsertReplace = new ReplaceOptions { IsUpsert = true }; protected static readonly SortDefinitionBuilder Sort = Builders.Sort; diff --git a/backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs b/backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs index f0e166f06..c8cb2e1f7 100644 --- a/backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs +++ b/backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs @@ -161,13 +161,13 @@ namespace Squidex.Infrastructure.Commands { Guard.NotNull(command, nameof(command)); - MatchingVersion(command); - MatchingCreateCommand(command); - if (Version != EtagVersion.Empty && !(IsDeleted() && CanRecreate())) { throw new DomainObjectConflictException(uniqueId.ToString()); } + + MatchingVersion(command); + MatchingCreateCommand(command); } private async Task EnsureCanUpdateAsync(TCommand command) where TCommand : ICommand @@ -176,11 +176,11 @@ namespace Squidex.Infrastructure.Commands await EnsureLoadedAsync(); - MatchingVersion(command); - MatchingCommand(command); - NotDeleted(); NotEmpty(); + + MatchingVersion(command); + MatchingCommand(command); } private async Task EnsureCanUpsertAsync(TCommand command) where TCommand : ICommand @@ -189,6 +189,11 @@ namespace Squidex.Infrastructure.Commands await EnsureLoadedAsync(); + if (IsDeleted() && !CanRecreate()) + { + throw new DomainObjectDeletedException(uniqueId.ToString()); + } + MatchingVersion(command); if (Version <= EtagVersion.Empty) @@ -199,11 +204,6 @@ namespace Squidex.Infrastructure.Commands { MatchingCommand(command); } - - if (IsDeleted() && !CanRecreate()) - { - throw new DomainObjectDeletedException(uniqueId.ToString()); - } } private async Task EnsureCanDeleteAsync(TCommand command) where TCommand : ICommand @@ -212,10 +212,10 @@ namespace Squidex.Infrastructure.Commands await EnsureLoadedAsync(); + NotEmpty(); + MatchingVersion(command); MatchingCommand(command); - - NotEmpty(); } private void NotDeleted() diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/EventCommit.cs b/backend/src/Squidex.Infrastructure/EventSourcing/EventCommit.cs new file mode 100644 index 000000000..ff57bbcd1 --- /dev/null +++ b/backend/src/Squidex.Infrastructure/EventSourcing/EventCommit.cs @@ -0,0 +1,31 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; + +#pragma warning disable SA1313 // Parameter names should begin with lower-case letter + +namespace Squidex.Infrastructure.EventSourcing +{ + public sealed record EventCommit(Guid Id, string StreamName, long Offset, ICollection Events) + { + public static EventCommit Create(Guid id, string streamName, long offset, EventData @event) + { + return new EventCommit(id, streamName, offset, new List { @event }); + } + + public static EventCommit Create(string streamName, long offset, Envelope envelope, IEventDataFormatter eventDataFormatter) + { + var id = Guid.NewGuid(); + + var eventData = eventDataFormatter.ToEventData(envelope, id); + + return new EventCommit(id, streamName, offset, new List { eventData }); + } + } +} diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs index 4f594a291..bd9961315 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs @@ -27,5 +27,13 @@ namespace Squidex.Infrastructure.EventSourcing Task DeleteStreamAsync(string streamName); IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null); + + async Task AppendUnsafeAsync(IEnumerable commits) + { + foreach (var commit in commits) + { + await AppendAsync(commit.Id, commit.StreamName, commit.Offset, commit.Events); + } + } } } diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs index 14b6d6739..647b8d90e 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs @@ -111,6 +111,35 @@ namespace Squidex.Infrastructure.EventSourcing ShouldBeEquivalentTo(readEvents2, expected); } + [Fact] + public async Task Should_append_event_unsafe() + { + var streamName = $"test-{Guid.NewGuid()}"; + + var events = new[] + { + new EventData("Type1", new EnvelopeHeaders(), "1"), + new EventData("Type2", new EnvelopeHeaders(), "2") + }; + + await Sut.AppendUnsafeAsync(new List + { + new EventCommit(Guid.NewGuid(), streamName, -1, events) + }); + + var readEvents1 = await QueryAsync(streamName); + var readEvents2 = await QueryWithCallbackAsync(streamName); + + var expected = new[] + { + new StoredEvent(streamName, "Position", 0, events[0]), + new StoredEvent(streamName, "Position", 1, events[1]) + }; + + ShouldBeEquivalentTo(readEvents1, expected); + ShouldBeEquivalentTo(readEvents2, expected); + } + [Fact] public async Task Should_subscribe_to_events() {