Browse Source

Fix/backup (#678)

* Fix backup and improve performance.

* Refactoring

* Code cleanup.
pull/679/head
Sebastian Stehle 5 years ago
committed by GitHub
parent
commit
8f07bc4709
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreContext.cs
  2. 67
      backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs
  3. 2
      backend/src/Squidex.Domain.Apps.Entities/Contents/BackupContents.cs
  4. 24
      backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs
  5. 1
      backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs
  6. 26
      backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs
  7. 31
      backend/src/Squidex.Infrastructure/EventSourcing/EventCommit.cs
  8. 8
      backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs
  9. 29
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs

31
backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreContext.cs

@ -5,12 +5,17 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using Squidex.Infrastructure;
namespace Squidex.Domain.Apps.Entities.Backup
{
public sealed class RestoreContext : BackupContextBase
{
private readonly Dictionary<string, long> streams = new Dictionary<string, long>(1000);
private string? appStream;
public IBackupReader Reader { get; }
public DomainId PreviousAppId { get; set; }
@ -24,5 +29,31 @@ namespace Squidex.Domain.Apps.Entities.Backup
PreviousAppId = previousAppId;
}
public string GetStreamName(string streamName)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
if (streamName.StartsWith("app-", StringComparison.OrdinalIgnoreCase))
{
return appStream ??= $"app-{AppId}";
}
return streamName.Replace(PreviousAppId.ToString(), AppId.ToString());
}
public long GetStreamOffset(string streamName)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
if (!streams.TryGetValue(streamName, out var offset))
{
offset = EtagVersion.Empty;
}
streams[streamName] = offset + 1;
return offset;
}
}
}

67
backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs

@ -31,6 +31,7 @@ namespace Squidex.Domain.Apps.Entities.Backup
{
public sealed class RestoreGrain : GrainOfString, IRestoreGrain
{
private const int BatchSize = 500;
private readonly IBackupArchiveLocation backupArchiveLocation;
private readonly IClock clock;
private readonly ICommandBus commandBus;
@ -141,7 +142,10 @@ namespace Squidex.Domain.Apps.Entities.Backup
{
var handlers = CreateHandlers();
var logContext = (jobId: CurrentJob.Id.ToString(), jobUrl: CurrentJob.Url.ToString());
var logContext = (
jobId: CurrentJob.Id.ToString(),
jobUrl: CurrentJob.Url.ToString()
);
using (Profiler.StartSession())
{
@ -312,15 +316,51 @@ namespace Squidex.Domain.Apps.Entities.Backup
private async Task ReadEventsAsync(IBackupReader reader, IEnumerable<IBackupHandler> handlers)
{
var batch = new List<(string, Envelope<IEvent>)>(BatchSize);
await reader.ReadEventsAsync(streamNameResolver, eventDataFormatter, async storedEvent =>
{
await HandleEventAsync(reader, handlers, storedEvent.Stream, storedEvent.Event);
batch.Add(storedEvent);
if (batch.Count == BatchSize)
{
await CommitBatchAsync(reader, handlers, batch);
batch.Clear();
}
});
if (batch.Count > 0)
{
await CommitBatchAsync(reader, handlers, batch);
}
Log($"Reading {reader.ReadEvents} events and {reader.ReadAttachments} attachments completed.", true);
}
private async Task HandleEventAsync(IBackupReader reader, IEnumerable<IBackupHandler> handlers, string stream, Envelope<IEvent> @event)
private async Task CommitBatchAsync(IBackupReader reader, IEnumerable<IBackupHandler> handlers, List<(string, Envelope<IEvent>)> batch)
{
var commits = new List<EventCommit>(batch.Count);
foreach (var (stream, @event) in batch)
{
var handled = await HandleEventAsync(reader, handlers, stream, @event);
if (handled)
{
var streamName = restoreContext.GetStreamName(stream);
var streamOffset = restoreContext.GetStreamOffset(streamName);
commits.Add(EventCommit.Create(streamName, streamOffset, @event, eventDataFormatter));
}
}
await eventStore.AppendUnsafeAsync(commits);
Log($"Read {reader.ReadEvents} events and {reader.ReadAttachments} attachments.", true);
}
private async Task<bool> HandleEventAsync(IBackupReader reader, IEnumerable<IBackupHandler> handlers, string stream, Envelope<IEvent> @event)
{
if (@event.Payload is AppCreated appCreated)
{
@ -340,10 +380,6 @@ namespace Squidex.Domain.Apps.Entities.Backup
await CreateContextAsync(reader, previousAppId);
}
stream = stream.Replace(
restoreContext.PreviousAppId.ToString(),
restoreContext.AppId.ToString());
if (@event.Payload is SquidexEvent squidexEvent && squidexEvent.Actor != null)
{
if (restoreContext.UserMapping.TryMap(squidexEvent.Actor, out var newUser))
@ -357,9 +393,9 @@ namespace Squidex.Domain.Apps.Entities.Backup
appEvent.AppId = CurrentJob.AppId;
}
if (@event.Headers.TryGet(CommonHeaders.AggregateId, out var aggregateId) && aggregateId is JsonString s)
if (@event.Headers.TryGet(CommonHeaders.AggregateId, out var value) && value is JsonString idString)
{
var id = s.Value.Replace(
var id = idString.Value.Replace(
restoreContext.PreviousAppId.ToString(),
restoreContext.AppId.ToString());
@ -372,20 +408,11 @@ namespace Squidex.Domain.Apps.Entities.Backup
{
if (!await handler.RestoreEventAsync(@event, restoreContext))
{
return;
return false;
}
}
var eventData = eventDataFormatter.ToEventData(@event, @event.Headers.CommitId());
var eventCommit = new List<EventData>
{
eventData
};
await eventStore.AppendAsync(Guid.NewGuid(), stream, eventCommit);
Log($"Read {reader.ReadEvents} events and {reader.ReadAttachments} attachments.", true);
return true;
}
private async Task CreateContextAsync(IBackupReader reader, DomainId previousAppId)

2
backend/src/Squidex.Domain.Apps.Entities/Contents/BackupContents.cs

@ -204,7 +204,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
break;
case JsonObject obj:
ReplaceAssetUrl(obj, FieldSetter);
ReplaceAssetUrl(obj, JsonSetter);
break;
}
}

24
backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs

@ -37,8 +37,6 @@ namespace Squidex.Infrastructure.EventSourcing
Guard.NotEmpty(commitId, nameof(commitId));
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
Guard.LessThan(events.Count, MaxCommitSize, "events.Count");
Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion));
@ -100,6 +98,28 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
public async Task AppendUnsafeAsync(IEnumerable<EventCommit> commits)
{
Guard.NotNull(commits, nameof(commits));
using (Profiler.TraceMethod<MongoEventStore>())
{
var writes = new List<WriteModel<MongoEventCommit>>();
foreach (var commit in commits)
{
var document = BuildCommit(commit.Id, commit.StreamName, commit.Offset, commit.Events);
writes.Add(new InsertOneModel<MongoEventCommit>(document));
}
if (writes.Count > 0)
{
await Collection.BulkWriteAsync(writes, Unordered);
}
}
}
private async Task<long> GetEventStreamOffsetAsync(string streamName)
{
var document =

1
backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs

@ -21,6 +21,7 @@ namespace Squidex.Infrastructure.MongoDb
{
private const string CollectionFormat = "{0}Set";
protected static readonly BulkWriteOptions Unordered = new BulkWriteOptions { IsOrdered = true };
protected static readonly UpdateOptions Upsert = new UpdateOptions { IsUpsert = true };
protected static readonly ReplaceOptions UpsertReplace = new ReplaceOptions { IsUpsert = true };
protected static readonly SortDefinitionBuilder<TEntity> Sort = Builders<TEntity>.Sort;

26
backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs

@ -161,13 +161,13 @@ namespace Squidex.Infrastructure.Commands
{
Guard.NotNull(command, nameof(command));
MatchingVersion(command);
MatchingCreateCommand(command);
if (Version != EtagVersion.Empty && !(IsDeleted() && CanRecreate()))
{
throw new DomainObjectConflictException(uniqueId.ToString());
}
MatchingVersion(command);
MatchingCreateCommand(command);
}
private async Task EnsureCanUpdateAsync<TCommand>(TCommand command) where TCommand : ICommand
@ -176,11 +176,11 @@ namespace Squidex.Infrastructure.Commands
await EnsureLoadedAsync();
MatchingVersion(command);
MatchingCommand(command);
NotDeleted();
NotEmpty();
MatchingVersion(command);
MatchingCommand(command);
}
private async Task EnsureCanUpsertAsync<TCommand>(TCommand command) where TCommand : ICommand
@ -189,6 +189,11 @@ namespace Squidex.Infrastructure.Commands
await EnsureLoadedAsync();
if (IsDeleted() && !CanRecreate())
{
throw new DomainObjectDeletedException(uniqueId.ToString());
}
MatchingVersion(command);
if (Version <= EtagVersion.Empty)
@ -199,11 +204,6 @@ namespace Squidex.Infrastructure.Commands
{
MatchingCommand(command);
}
if (IsDeleted() && !CanRecreate())
{
throw new DomainObjectDeletedException(uniqueId.ToString());
}
}
private async Task EnsureCanDeleteAsync<TCommand>(TCommand command) where TCommand : ICommand
@ -212,10 +212,10 @@ namespace Squidex.Infrastructure.Commands
await EnsureLoadedAsync();
NotEmpty();
MatchingVersion(command);
MatchingCommand(command);
NotEmpty();
}
private void NotDeleted()

31
backend/src/Squidex.Infrastructure/EventSourcing/EventCommit.cs

@ -0,0 +1,31 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
#pragma warning disable SA1313 // Parameter names should begin with lower-case letter
namespace Squidex.Infrastructure.EventSourcing
{
public sealed record EventCommit(Guid Id, string StreamName, long Offset, ICollection<EventData> Events)
{
public static EventCommit Create(Guid id, string streamName, long offset, EventData @event)
{
return new EventCommit(id, streamName, offset, new List<EventData> { @event });
}
public static EventCommit Create(string streamName, long offset, Envelope<IEvent> envelope, IEventDataFormatter eventDataFormatter)
{
var id = Guid.NewGuid();
var eventData = eventDataFormatter.ToEventData(envelope, id);
return new EventCommit(id, streamName, offset, new List<EventData> { eventData });
}
}
}

8
backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs

@ -27,5 +27,13 @@ namespace Squidex.Infrastructure.EventSourcing
Task DeleteStreamAsync(string streamName);
IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null);
async Task AppendUnsafeAsync(IEnumerable<EventCommit> commits)
{
foreach (var commit in commits)
{
await AppendAsync(commit.Id, commit.StreamName, commit.Offset, commit.Events);
}
}
}
}

29
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs

@ -111,6 +111,35 @@ namespace Squidex.Infrastructure.EventSourcing
ShouldBeEquivalentTo(readEvents2, expected);
}
[Fact]
public async Task Should_append_event_unsafe()
{
var streamName = $"test-{Guid.NewGuid()}";
var events = new[]
{
new EventData("Type1", new EnvelopeHeaders(), "1"),
new EventData("Type2", new EnvelopeHeaders(), "2")
};
await Sut.AppendUnsafeAsync(new List<EventCommit>
{
new EventCommit(Guid.NewGuid(), streamName, -1, events)
});
var readEvents1 = await QueryAsync(streamName);
var readEvents2 = await QueryWithCallbackAsync(streamName);
var expected = new[]
{
new StoredEvent(streamName, "Position", 0, events[0]),
new StoredEvent(streamName, "Position", 1, events[1])
};
ShouldBeEquivalentTo(readEvents1, expected);
ShouldBeEquivalentTo(readEvents2, expected);
}
[Fact]
public async Task Should_subscribe_to_events()
{

Loading…
Cancel
Save