|
|
|
@ -7,10 +7,10 @@ |
|
|
|
|
|
|
|
using System; |
|
|
|
using System.Collections.Generic; |
|
|
|
using System.Net.Http; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using NodaTime; |
|
|
|
using Orleans; |
|
|
|
using Squidex.Domain.Apps.Entities.Backup.Helpers; |
|
|
|
using Squidex.Domain.Apps.Entities.Backup.State; |
|
|
|
using Squidex.Domain.Apps.Events; |
|
|
|
using Squidex.Domain.Apps.Events.Apps; |
|
|
|
@ -26,7 +26,6 @@ namespace Squidex.Domain.Apps.Entities.Backup |
|
|
|
{ |
|
|
|
public sealed class RestoreGrain : GrainOfString, IRestoreGrain |
|
|
|
{ |
|
|
|
private static readonly Duration UpdateDuration = Duration.FromSeconds(1); |
|
|
|
private readonly IClock clock; |
|
|
|
private readonly IAssetStore assetStore; |
|
|
|
private readonly IEventDataFormatter eventDataFormatter; |
|
|
|
@ -37,6 +36,7 @@ namespace Squidex.Domain.Apps.Entities.Backup |
|
|
|
private readonly IBackupArchiveLocation backupArchiveLocation; |
|
|
|
private readonly IStore<string> store; |
|
|
|
private readonly IEnumerable<BackupHandler> handlers; |
|
|
|
private RefToken actor; |
|
|
|
private RestoreState state = new RestoreState(); |
|
|
|
private IPersistence<RestoreState> persistence; |
|
|
|
|
|
|
|
@ -76,36 +76,56 @@ namespace Squidex.Domain.Apps.Entities.Backup |
|
|
|
|
|
|
|
public override async Task OnActivateAsync(string key) |
|
|
|
{ |
|
|
|
actor = new RefToken("subject", key); |
|
|
|
|
|
|
|
persistence = store.WithSnapshots<RestoreState, string>(GetType(), key, s => state = s); |
|
|
|
|
|
|
|
await persistence.ReadAsync(); |
|
|
|
await ReadAsync(); |
|
|
|
|
|
|
|
await CleanupAsync(); |
|
|
|
RecoverAfterRestart(); |
|
|
|
} |
|
|
|
|
|
|
|
public Task RestoreAsync(Uri url, RefToken user) |
|
|
|
private void RecoverAfterRestart() |
|
|
|
{ |
|
|
|
if (state.Job != null) |
|
|
|
{ |
|
|
|
throw new DomainException("A restore operation is already running."); |
|
|
|
} |
|
|
|
RecoverAfterRestartAsync().Forget(); |
|
|
|
} |
|
|
|
|
|
|
|
state.Job = new RestoreStateJob { Started = clock.GetCurrentInstant(), Uri = url, User = user }; |
|
|
|
private async Task RecoverAfterRestartAsync() |
|
|
|
{ |
|
|
|
if (state.Job?.Status == JobStatus.Started) |
|
|
|
{ |
|
|
|
Log("Failed due application restart"); |
|
|
|
|
|
|
|
return ProcessAsync(); |
|
|
|
await CleanupAsync(); |
|
|
|
await WriteAsync(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private async Task CleanupAsync() |
|
|
|
public Task RestoreAsync(Uri url) |
|
|
|
{ |
|
|
|
if (state.Job != null) |
|
|
|
Guard.NotNull(url, nameof(url)); |
|
|
|
|
|
|
|
if (state.Job?.Status == JobStatus.Started) |
|
|
|
{ |
|
|
|
throw new DomainException("A restore operation is already running."); |
|
|
|
} |
|
|
|
|
|
|
|
state.Job = new RestoreStateJob |
|
|
|
{ |
|
|
|
state.Job.Status = "Failed due application restart"; |
|
|
|
state.Job.IsFailed = true; |
|
|
|
Id = Guid.NewGuid(), |
|
|
|
Started = clock.GetCurrentInstant(), |
|
|
|
Status = JobStatus.Started, |
|
|
|
Uri = url |
|
|
|
}; |
|
|
|
|
|
|
|
TryCleanup(); |
|
|
|
Process(); |
|
|
|
|
|
|
|
await persistence.WriteSnapshotAsync(state); |
|
|
|
} |
|
|
|
return TaskHelper.Done; |
|
|
|
} |
|
|
|
|
|
|
|
private void Process() |
|
|
|
{ |
|
|
|
ProcessAsync().Forget(); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task ProcessAsync() |
|
|
|
@ -119,49 +139,40 @@ namespace Squidex.Domain.Apps.Entities.Backup |
|
|
|
.WriteProperty("status", "started") |
|
|
|
.WriteProperty("url", state.Job.Uri.ToString())); |
|
|
|
|
|
|
|
state.Job.Status = "Downloading Backup"; |
|
|
|
|
|
|
|
using (Profiler.Trace("Download")) |
|
|
|
{ |
|
|
|
await DownloadAsync(); |
|
|
|
} |
|
|
|
|
|
|
|
state.Job.Status = "Downloaded Backup"; |
|
|
|
|
|
|
|
using (var stream = await backupArchiveLocation.OpenStreamAsync(state.Job.Id)) |
|
|
|
using (var reader = await backupArchiveLocation.OpenArchiveAsync(state.Job.Id)) |
|
|
|
{ |
|
|
|
using (var reader = new BackupReader(stream)) |
|
|
|
using (Profiler.Trace("ReadEvents")) |
|
|
|
{ |
|
|
|
await ReadEventsAsync(reader); |
|
|
|
} |
|
|
|
|
|
|
|
foreach (var handler in handlers) |
|
|
|
{ |
|
|
|
using (Profiler.Trace("ReadEvents")) |
|
|
|
using (Profiler.TraceMethod(handler.GetType(), nameof(BackupHandler.RestoreAsync))) |
|
|
|
{ |
|
|
|
await ReadEventsAsync(reader); |
|
|
|
await handler.RestoreAsync(state.Job.AppId, reader); |
|
|
|
} |
|
|
|
|
|
|
|
state.Job.Status = "Events read"; |
|
|
|
Log($"Restored {handler.Name}"); |
|
|
|
} |
|
|
|
|
|
|
|
foreach (var handler in handlers) |
|
|
|
foreach (var handler in handlers) |
|
|
|
{ |
|
|
|
using (Profiler.TraceMethod(handler.GetType(), nameof(BackupHandler.CompleteRestoreAsync))) |
|
|
|
{ |
|
|
|
using (Profiler.TraceMethod(handler.GetType(), nameof(BackupHandler.RestoreAsync))) |
|
|
|
{ |
|
|
|
await handler.RestoreAsync(state.Job.AppId, reader); |
|
|
|
} |
|
|
|
|
|
|
|
state.Job.Status = $"{handler} Processed"; |
|
|
|
await handler.CompleteRestoreAsync(state.Job.AppId, reader); |
|
|
|
} |
|
|
|
|
|
|
|
foreach (var handler in handlers) |
|
|
|
{ |
|
|
|
using (Profiler.TraceMethod(handler.GetType(), nameof(BackupHandler.CompleteRestoreAsync))) |
|
|
|
{ |
|
|
|
await handler.CompleteRestoreAsync(state.Job.AppId, reader); |
|
|
|
} |
|
|
|
|
|
|
|
state.Job.Status = $"{handler} Completed"; |
|
|
|
} |
|
|
|
Log($"Completed {handler.Name}"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
state.Job = null; |
|
|
|
state.Job.Status = JobStatus.Failed; |
|
|
|
|
|
|
|
log.LogInformation(w => |
|
|
|
{ |
|
|
|
@ -174,17 +185,25 @@ namespace Squidex.Domain.Apps.Entities.Backup |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
state.Job.IsFailed = true; |
|
|
|
if (ex is BackupRestoreException backupException) |
|
|
|
{ |
|
|
|
Log(backupException.Message); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
Log("Failed with internal error"); |
|
|
|
} |
|
|
|
|
|
|
|
if (state.Job.AppId != Guid.Empty) |
|
|
|
try |
|
|
|
{ |
|
|
|
foreach (var handler in handlers) |
|
|
|
{ |
|
|
|
await handler.CleanupRestoreAsync(state.Job.AppId, ex); |
|
|
|
} |
|
|
|
await CleanupAsync(ex); |
|
|
|
} |
|
|
|
catch (Exception ex2) |
|
|
|
{ |
|
|
|
ex = ex2; |
|
|
|
} |
|
|
|
|
|
|
|
TryCleanup(); |
|
|
|
state.Job.Status = JobStatus.Failed; |
|
|
|
|
|
|
|
log.LogError(ex, w => |
|
|
|
{ |
|
|
|
@ -197,37 +216,46 @@ namespace Squidex.Domain.Apps.Entities.Backup |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await persistence.WriteSnapshotAsync(state); |
|
|
|
await WriteAsync(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private async Task DownloadAsync() |
|
|
|
private async Task CleanupAsync(Exception exception = null) |
|
|
|
{ |
|
|
|
using (var client = new HttpClient()) |
|
|
|
await backupArchiveLocation.DeleteArchiveAsync(state.Job.Id); |
|
|
|
|
|
|
|
if (state.Job.AppId != Guid.Empty) |
|
|
|
{ |
|
|
|
using (var sourceStream = await client.GetStreamAsync(state.Job.Uri.ToString())) |
|
|
|
foreach (var handler in handlers) |
|
|
|
{ |
|
|
|
using (var targetStream = await backupArchiveLocation.OpenStreamAsync(state.Job.Id)) |
|
|
|
{ |
|
|
|
await sourceStream.CopyToAsync(targetStream); |
|
|
|
} |
|
|
|
await handler.CleanupRestoreAsync(state.Job.AppId, exception); |
|
|
|
} |
|
|
|
|
|
|
|
await appCleaner.EnqueueAppAsync(state.Job.AppId); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private async Task DownloadAsync() |
|
|
|
{ |
|
|
|
Log("Downloading Backup"); |
|
|
|
|
|
|
|
await backupArchiveLocation.DownloadAsync(state.Job.Uri, state.Job.Id); |
|
|
|
|
|
|
|
Log("Downloaded Backup"); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task ReadEventsAsync(BackupReader reader) |
|
|
|
{ |
|
|
|
await reader.ReadEventsAsync(async (storedEvent) => |
|
|
|
{ |
|
|
|
var eventData = storedEvent.Data; |
|
|
|
var eventParsed = eventDataFormatter.Parse(eventData); |
|
|
|
var @event = eventDataFormatter.Parse(storedEvent.Data); |
|
|
|
|
|
|
|
if (eventParsed.Payload is SquidexEvent squidexEvent) |
|
|
|
if (@event.Payload is SquidexEvent squidexEvent) |
|
|
|
{ |
|
|
|
squidexEvent.Actor = state.Job.User; |
|
|
|
squidexEvent.Actor = actor; |
|
|
|
} |
|
|
|
else if (eventParsed.Payload is AppCreated appCreated) |
|
|
|
else if (@event.Payload is AppCreated appCreated) |
|
|
|
{ |
|
|
|
state.Job.AppId = appCreated.AppId.Id; |
|
|
|
|
|
|
|
@ -236,13 +264,15 @@ namespace Squidex.Domain.Apps.Entities.Backup |
|
|
|
|
|
|
|
foreach (var handler in handlers) |
|
|
|
{ |
|
|
|
await handler.RestoreEventAsync(eventParsed, state.Job.AppId, reader); |
|
|
|
await handler.RestoreEventAsync(@event, state.Job.AppId, reader); |
|
|
|
} |
|
|
|
|
|
|
|
await eventStore.AppendAsync(Guid.NewGuid(), storedEvent.StreamName, new List<EventData> { storedEvent.Data }); |
|
|
|
|
|
|
|
state.Job.Status = $"Handled event {reader.ReadEvents} events and {reader.ReadAttachments} attachments"; |
|
|
|
Log($"Read {reader.ReadEvents} events and {reader.ReadAttachments} attachments."); |
|
|
|
}); |
|
|
|
|
|
|
|
Log("Reading events completed."); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task CheckCleanupStatus() |
|
|
|
@ -253,24 +283,31 @@ namespace Squidex.Domain.Apps.Entities.Backup |
|
|
|
|
|
|
|
if (status == CleanerStatus.Cleaning) |
|
|
|
{ |
|
|
|
throw new DomainException("The app is removed in the background."); |
|
|
|
throw new BackupRestoreException("The app is removed in the background."); |
|
|
|
} |
|
|
|
|
|
|
|
if (status == CleanerStatus.Cleaning) |
|
|
|
{ |
|
|
|
throw new DomainException("The app could not be cleaned."); |
|
|
|
throw new BackupRestoreException("The app could not be cleaned."); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void TryCleanup() |
|
|
|
private void Log(string message) |
|
|
|
{ |
|
|
|
if (state.Job.AppId != Guid.Empty) |
|
|
|
{ |
|
|
|
appCleaner.EnqueueAppAsync(state.Job.AppId).Forget(); |
|
|
|
} |
|
|
|
state.Job.Log.Add($"{clock.GetCurrentInstant()}: {message}"); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task ReadAsync() |
|
|
|
{ |
|
|
|
await persistence.ReadAsync(); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task WriteAsync() |
|
|
|
{ |
|
|
|
await persistence.WriteSnapshotAsync(state); |
|
|
|
} |
|
|
|
|
|
|
|
public Task<J<IRestoreJob>> GetStateAsync() |
|
|
|
public Task<J<IRestoreJob>> GetJobAsync() |
|
|
|
{ |
|
|
|
return Task.FromResult<J<IRestoreJob>>(state.Job); |
|
|
|
} |
|
|
|
|