mirror of https://github.com/Squidex/squidex.git
Browse Source
* More progress with jobs * Jobs V1 * Tests updated * Fix jobs. * Fix languages. * Fix exception handling. * Revert restore endpoint behavior. * Fix endpoint.pull/1063/head
committed by
GitHub
119 changed files with 3125 additions and 2707 deletions
@ -0,0 +1,50 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
namespace Migrations.Migrations.Backup; |
|||
|
|||
public sealed class BackupState |
|||
{ |
|||
public List<BackupJob> Jobs { get; set; } = []; |
|||
|
|||
public JobsState ToJob() |
|||
{ |
|||
var result = new JobsState |
|||
{ |
|||
Jobs = Jobs.Select(ToState).ToList() |
|||
}; |
|||
|
|||
return result; |
|||
} |
|||
|
|||
private static Job ToState(BackupJob source) |
|||
{ |
|||
return new Job |
|||
{ |
|||
Arguments = [], |
|||
Id = source.Id, |
|||
TaskName = "backup", |
|||
Started = source.Started, |
|||
Stopped = source.Stopped, |
|||
File = new JobFile($"app-{source.Started:yyyy-MM-dd}.zip", "application/zip"), |
|||
Status = source.Status switch |
|||
{ |
|||
BackupStatus.Completed => JobStatus.Completed, |
|||
BackupStatus.Created => JobStatus.Created, |
|||
BackupStatus.Failed => JobStatus.Failed, |
|||
BackupStatus.Started => JobStatus.Started, |
|||
_ => JobStatus.Failed |
|||
}, |
|||
Log = |
|||
[ |
|||
new JobLogMessage(source.Stopped ?? source.Started, $"Total events: {source.HandledEvents}, assets: {source.HandledAssets}") |
|||
] |
|||
}; |
|||
} |
|||
} |
|||
@ -0,0 +1,39 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Domain.Apps.Entities.Jobs; |
|||
using Squidex.Infrastructure.Migrations; |
|||
using Squidex.Infrastructure.States; |
|||
|
|||
namespace Migrations.Migrations.Backup; |
|||
|
|||
public sealed class ConvertBackup : IMigration |
|||
{ |
|||
private readonly ISnapshotStore<BackupState> stateBackups; |
|||
private readonly ISnapshotStore<JobsState> stateJobs; |
|||
|
|||
public ConvertBackup( |
|||
ISnapshotStore<BackupState> stateBackups, |
|||
ISnapshotStore<JobsState> stateJobs) |
|||
{ |
|||
this.stateBackups = stateBackups; |
|||
this.stateJobs = stateJobs; |
|||
} |
|||
|
|||
public async Task UpdateAsync( |
|||
CancellationToken ct) |
|||
{ |
|||
await foreach (var state in stateBackups.ReadAllAsync(ct)) |
|||
{ |
|||
var job = state.Value.ToJob(); |
|||
|
|||
await stateJobs.WriteAsync(new SnapshotWriteJob<JobsState>(state.Key, job, 0), ct); |
|||
} |
|||
|
|||
await stateBackups.ClearAsync(ct); |
|||
} |
|||
} |
|||
@ -0,0 +1,141 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Domain.Apps.Core.Apps; |
|||
using Squidex.Domain.Apps.Entities.Jobs; |
|||
using Squidex.Domain.Apps.Events; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.EventSourcing; |
|||
using Squidex.Infrastructure.Translations; |
|||
using Squidex.Shared.Users; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup; |
|||
|
|||
public sealed class BackupJob : IJobRunner |
|||
{ |
|||
public const string TaskName = "backup"; |
|||
public const string ArgAppId = "appId"; |
|||
public const string ArgAppName = "appName"; |
|||
|
|||
private readonly IBackupArchiveLocation backupArchiveLocation; |
|||
private readonly IBackupArchiveStore backupArchiveStore; |
|||
private readonly IBackupHandlerFactory backupHandlerFactory; |
|||
private readonly IEventFormatter eventFormatter; |
|||
private readonly IEventStore eventStore; |
|||
private readonly IUserResolver userResolver; |
|||
|
|||
public string Name => TaskName; |
|||
|
|||
public int MaxJobs => 10; |
|||
|
|||
public BackupJob( |
|||
IBackupArchiveLocation backupArchiveLocation, |
|||
IBackupArchiveStore backupArchiveStore, |
|||
IBackupHandlerFactory backupHandlerFactory, |
|||
IEventFormatter eventFormatter, |
|||
IEventStore eventStore, |
|||
IUserResolver userResolver) |
|||
{ |
|||
this.backupArchiveLocation = backupArchiveLocation; |
|||
this.backupArchiveStore = backupArchiveStore; |
|||
this.backupHandlerFactory = backupHandlerFactory; |
|||
this.eventFormatter = eventFormatter; |
|||
this.eventStore = eventStore; |
|||
this.userResolver = userResolver; |
|||
} |
|||
|
|||
public static JobRequest BuildRequest(RefToken actor, App app) |
|||
{ |
|||
return JobRequest.Create( |
|||
actor, |
|||
TaskName, |
|||
new Dictionary<string, string> |
|||
{ |
|||
[ArgAppId] = app.Id.ToString(), |
|||
[ArgAppName] = app.Name |
|||
}); |
|||
} |
|||
|
|||
public Task DownloadAsync(Job state, Stream stream, |
|||
CancellationToken ct) |
|||
{ |
|||
return backupArchiveStore.DownloadAsync(state.Id, stream, ct); |
|||
} |
|||
|
|||
public Task CleanupAsync(Job state) |
|||
{ |
|||
return backupArchiveStore.DeleteAsync(state.Id, default); |
|||
} |
|||
|
|||
public async Task RunAsync(JobRunContext context, |
|||
CancellationToken ct) |
|||
{ |
|||
var appId = context.OwnerId; |
|||
var appName = context.Job.Arguments.GetValueOrDefault(ArgAppName, "app"); |
|||
|
|||
// We store the file in a the asset store and make the information available.
|
|||
context.Job.File = new JobFile($"backup-{appName}-{context.Job.Started:yyyy-MM-dd_HH-mm-ss}.zip", "application/zip"); |
|||
|
|||
// Use a readable name to describe the job.
|
|||
context.Job.Description = T.Get("job.backup"); |
|||
|
|||
var handlers = backupHandlerFactory.CreateMany(); |
|||
|
|||
await using var stream = backupArchiveLocation.OpenStream(context.Job.Id); |
|||
|
|||
using (var writer = await backupArchiveLocation.OpenWriterAsync(stream, ct)) |
|||
{ |
|||
await writer.WriteVersionAsync(); |
|||
|
|||
var backupUsers = new UserMapping(context.Actor); |
|||
var backupContext = new BackupContext(appId, backupUsers, writer); |
|||
|
|||
var streamFilter = StreamFilter.Prefix($"[^\\-]*-{appId}"); |
|||
|
|||
await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, ct: ct)) |
|||
{ |
|||
var @event = eventFormatter.Parse(storedEvent); |
|||
|
|||
if (@event.Payload is SquidexEvent { Actor: { } } squidexEvent) |
|||
{ |
|||
backupUsers.Backup(squidexEvent.Actor); |
|||
} |
|||
|
|||
foreach (var handler in handlers) |
|||
{ |
|||
await handler.BackupEventAsync(@event, backupContext, ct); |
|||
} |
|||
|
|||
writer.WriteEvent(storedEvent, ct); |
|||
|
|||
await context.LogAsync($"Total events: {writer.WrittenEvents}, assets: {writer.WrittenAttachments}", true); |
|||
} |
|||
|
|||
foreach (var handler in handlers) |
|||
{ |
|||
ct.ThrowIfCancellationRequested(); |
|||
|
|||
await handler.BackupAsync(backupContext, ct); |
|||
} |
|||
|
|||
foreach (var handler in handlers) |
|||
{ |
|||
ct.ThrowIfCancellationRequested(); |
|||
|
|||
await handler.CompleteBackupAsync(backupContext); |
|||
} |
|||
|
|||
await backupUsers.StoreAsync(writer, userResolver, ct); |
|||
} |
|||
|
|||
stream.Position = 0; |
|||
|
|||
ct.ThrowIfCancellationRequested(); |
|||
|
|||
await backupArchiveStore.UploadAsync(context.Job.Id, stream, ct); |
|||
} |
|||
} |
|||
@ -1,54 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Domain.Apps.Entities.Backup.State; |
|||
using Squidex.Infrastructure; |
|||
|
|||
#pragma warning disable MA0040 // Flow the cancellation token
|
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup; |
|||
|
|||
public sealed partial class BackupProcessor |
|||
{ |
|||
// Use a run to store all state that is necessary for a single run.
|
|||
private sealed class Run : IDisposable |
|||
{ |
|||
private readonly CancellationTokenSource cancellationSource = new CancellationTokenSource(); |
|||
private readonly CancellationTokenSource cancellationLinked; |
|||
|
|||
public IEnumerable<IBackupHandler> Handlers { get; init; } |
|||
|
|||
public RefToken Actor { get; init; } |
|||
|
|||
public BackupJob Job { get; init; } |
|||
|
|||
public CancellationToken CancellationToken => cancellationLinked.Token; |
|||
|
|||
public Run(CancellationToken ct) |
|||
{ |
|||
cancellationLinked = CancellationTokenSource.CreateLinkedTokenSource(ct, cancellationSource.Token); |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
cancellationSource.Dispose(); |
|||
cancellationLinked.Dispose(); |
|||
} |
|||
|
|||
public void Cancel() |
|||
{ |
|||
try |
|||
{ |
|||
cancellationSource.Cancel(); |
|||
} |
|||
catch (ObjectDisposedException) |
|||
{ |
|||
// Cancellation token might have been disposed, if the run is completed.
|
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -1,269 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.Extensions.Logging; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Entities.Backup.State; |
|||
using Squidex.Domain.Apps.Events; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.EventSourcing; |
|||
using Squidex.Infrastructure.States; |
|||
using Squidex.Infrastructure.Tasks; |
|||
using Squidex.Infrastructure.Translations; |
|||
using Squidex.Shared.Users; |
|||
|
|||
#pragma warning disable MA0040 // Flow the cancellation token
|
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup; |
|||
|
|||
public sealed partial class BackupProcessor |
|||
{ |
|||
private readonly IBackupArchiveLocation backupArchiveLocation; |
|||
private readonly IBackupArchiveStore backupArchiveStore; |
|||
private readonly IBackupHandlerFactory backupHandlerFactory; |
|||
private readonly IEventFormatter eventFormatter; |
|||
private readonly IEventStore eventStore; |
|||
private readonly IUserResolver userResolver; |
|||
private readonly ILogger<BackupProcessor> log; |
|||
private readonly SimpleState<BackupState> state; |
|||
private readonly ReentrantScheduler scheduler = new ReentrantScheduler(1); |
|||
private readonly DomainId appId; |
|||
private Run? currentRun; |
|||
|
|||
public IClock Clock { get; set; } = SystemClock.Instance; |
|||
|
|||
public BackupProcessor( |
|||
DomainId appId, |
|||
IBackupArchiveLocation backupArchiveLocation, |
|||
IBackupArchiveStore backupArchiveStore, |
|||
IBackupHandlerFactory backupHandlerFactory, |
|||
IEventFormatter eventFormatter, |
|||
IEventStore eventStore, |
|||
IPersistenceFactory<BackupState> persistenceFactory, |
|||
IUserResolver userResolver, |
|||
ILogger<BackupProcessor> log) |
|||
{ |
|||
this.appId = appId; |
|||
this.backupArchiveLocation = backupArchiveLocation; |
|||
this.backupArchiveStore = backupArchiveStore; |
|||
this.backupHandlerFactory = backupHandlerFactory; |
|||
this.eventFormatter = eventFormatter; |
|||
this.eventStore = eventStore; |
|||
this.userResolver = userResolver; |
|||
this.log = log; |
|||
|
|||
// Enable locking for the parallel operations that might write stuff.
|
|||
state = new SimpleState<BackupState>(persistenceFactory, GetType(), appId, true); |
|||
} |
|||
|
|||
public async Task LoadAsync( |
|||
CancellationToken ct) |
|||
{ |
|||
await state.LoadAsync(ct); |
|||
|
|||
if (state.Value.Jobs.RemoveAll(x => x.Stopped == null) > 0) |
|||
{ |
|||
// This should actually never happen, so we log with warning.
|
|||
log.LogWarning("Removed unfinished backups for app {appId} after start.", appId); |
|||
|
|||
await state.WriteAsync(ct); |
|||
} |
|||
} |
|||
|
|||
public Task ClearAsync() |
|||
{ |
|||
return scheduler.ScheduleAsync(async _ => |
|||
{ |
|||
log.LogInformation("Clearing backups for app {appId}.", appId); |
|||
|
|||
foreach (var backup in state.Value.Jobs) |
|||
{ |
|||
await backupArchiveStore.DeleteAsync(backup.Id, default); |
|||
} |
|||
|
|||
await state.ClearAsync(default); |
|||
}); |
|||
} |
|||
|
|||
public Task BackupAsync(RefToken actor, |
|||
CancellationToken ct) |
|||
{ |
|||
return scheduler.ScheduleAsync(async _ => |
|||
{ |
|||
if (currentRun != null) |
|||
{ |
|||
throw new DomainException(T.Get("backups.alreadyRunning")); |
|||
} |
|||
|
|||
state.Value.EnsureCanStart(); |
|||
|
|||
// Set the current run first to indicate that we are running a rule at the moment.
|
|||
var run = currentRun = new Run(ct) |
|||
{ |
|||
Actor = actor, |
|||
Job = new BackupJob |
|||
{ |
|||
Id = DomainId.NewGuid(), |
|||
Started = Clock.GetCurrentInstant(), |
|||
Status = JobStatus.Started |
|||
}, |
|||
Handlers = backupHandlerFactory.CreateMany() |
|||
}; |
|||
|
|||
log.LogInformation("Starting new backup with backup id '{backupId}' for app {appId}.", run.Job.Id, appId); |
|||
|
|||
state.Value.Jobs.Insert(0, run.Job); |
|||
try |
|||
{ |
|||
await ProcessAsync(run, run.CancellationToken); |
|||
} |
|||
finally |
|||
{ |
|||
// Unset the run to indicate that we are done.
|
|||
currentRun.Dispose(); |
|||
currentRun = null; |
|||
} |
|||
}, ct); |
|||
} |
|||
|
|||
private async Task ProcessAsync(Run run, |
|||
CancellationToken ct) |
|||
{ |
|||
try |
|||
{ |
|||
await state.WriteAsync(run.CancellationToken); |
|||
|
|||
await using (var stream = backupArchiveLocation.OpenStream(run.Job.Id)) |
|||
{ |
|||
using (var writer = await backupArchiveLocation.OpenWriterAsync(stream, ct)) |
|||
{ |
|||
await writer.WriteVersionAsync(); |
|||
|
|||
var backupUsers = new UserMapping(run.Actor); |
|||
var backupContext = new BackupContext(appId, backupUsers, writer); |
|||
|
|||
var streamFilter = StreamFilter.Prefix($"[^\\-]*-{appId}"); |
|||
|
|||
await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, ct: ct)) |
|||
{ |
|||
var @event = eventFormatter.Parse(storedEvent); |
|||
|
|||
if (@event.Payload is SquidexEvent { Actor: { } } squidexEvent) |
|||
{ |
|||
backupUsers.Backup(squidexEvent.Actor); |
|||
} |
|||
|
|||
foreach (var handler in run.Handlers) |
|||
{ |
|||
await handler.BackupEventAsync(@event, backupContext, ct); |
|||
} |
|||
|
|||
writer.WriteEvent(storedEvent, ct); |
|||
|
|||
await LogAsync(run, writer.WrittenEvents, writer.WrittenAttachments); |
|||
} |
|||
|
|||
foreach (var handler in run.Handlers) |
|||
{ |
|||
ct.ThrowIfCancellationRequested(); |
|||
|
|||
await handler.BackupAsync(backupContext, ct); |
|||
} |
|||
|
|||
foreach (var handler in run.Handlers) |
|||
{ |
|||
ct.ThrowIfCancellationRequested(); |
|||
|
|||
await handler.CompleteBackupAsync(backupContext); |
|||
} |
|||
|
|||
await backupUsers.StoreAsync(writer, userResolver, ct); |
|||
} |
|||
|
|||
stream.Position = 0; |
|||
|
|||
ct.ThrowIfCancellationRequested(); |
|||
|
|||
await backupArchiveStore.UploadAsync(run.Job.Id, stream, ct); |
|||
} |
|||
|
|||
await SetStatusAsync(run, JobStatus.Completed); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
await SetStatusAsync(run, JobStatus.Failed); |
|||
|
|||
log.LogError(ex, "Failed to make backup with backup id '{backupId}'.", run.Job.Id); |
|||
} |
|||
} |
|||
|
|||
public Task DeleteAsync(DomainId id) |
|||
{ |
|||
return scheduler.ScheduleAsync(async _ => |
|||
{ |
|||
var job = state.Value.Jobs.Find(x => x.Id == id); |
|||
|
|||
if (job == null) |
|||
{ |
|||
throw new DomainObjectNotFoundException(id.ToString()); |
|||
} |
|||
|
|||
log.LogInformation("Deleting backup with backup id '{backupId}' for app {appId}.", job.Id, appId); |
|||
|
|||
if (currentRun?.Job == job) |
|||
{ |
|||
currentRun.Cancel(); |
|||
} |
|||
else |
|||
{ |
|||
await RemoveAsync(job); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
private async Task RemoveAsync(BackupJob job) |
|||
{ |
|||
try |
|||
{ |
|||
await backupArchiveStore.DeleteAsync(job.Id); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, "Failed to make remove with backup id '{backupId}'.", job.Id); |
|||
} |
|||
|
|||
state.Value.Jobs.Remove(job); |
|||
|
|||
await state.WriteAsync(); |
|||
} |
|||
|
|||
private Task SetStatusAsync(Run run, JobStatus status) |
|||
{ |
|||
var now = Clock.GetCurrentInstant(); |
|||
|
|||
run.Job.Status = status; |
|||
|
|||
if (status == JobStatus.Failed || status == JobStatus.Completed) |
|||
{ |
|||
run.Job.Stopped = now; |
|||
} |
|||
else if (status == JobStatus.Started) |
|||
{ |
|||
run.Job.Started = now; |
|||
} |
|||
|
|||
return state.WriteAsync(ct: default); |
|||
} |
|||
|
|||
private Task LogAsync(Run run, int numEvents, int numAttachments) |
|||
{ |
|||
run.Job.HandledEvents = numEvents; |
|||
run.Job.HandledAssets = numAttachments; |
|||
|
|||
return state.WriteAsync(100, run.CancellationToken); |
|||
} |
|||
} |
|||
@ -1,98 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Domain.Apps.Core.Apps; |
|||
using Squidex.Domain.Apps.Entities.Backup.State; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.States; |
|||
using Squidex.Messaging; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup; |
|||
|
|||
public sealed class BackupService : IBackupService, IDeleter |
|||
{ |
|||
private readonly SimpleState<BackupRestoreState> restoreState; |
|||
private readonly IPersistenceFactory<BackupState> persistenceFactoryBackup; |
|||
private readonly IMessageBus messaging; |
|||
|
|||
public BackupService( |
|||
IPersistenceFactory<BackupRestoreState> persistenceFactoryRestore, |
|||
IPersistenceFactory<BackupState> persistenceFactoryBackup, |
|||
IMessageBus messaging) |
|||
{ |
|||
this.persistenceFactoryBackup = persistenceFactoryBackup; |
|||
this.messaging = messaging; |
|||
|
|||
restoreState = new SimpleState<BackupRestoreState>(persistenceFactoryRestore, GetType(), "Default"); |
|||
} |
|||
|
|||
Task IDeleter.DeleteAppAsync(App app, |
|||
CancellationToken ct) |
|||
{ |
|||
return messaging.PublishAsync(new BackupClear(app.Id), ct: ct); |
|||
} |
|||
|
|||
public async Task StartBackupAsync(DomainId appId, RefToken actor, |
|||
CancellationToken ct = default) |
|||
{ |
|||
var state = await GetStateAsync(appId, ct); |
|||
|
|||
state.Value.EnsureCanStart(); |
|||
|
|||
await messaging.PublishAsync(new BackupStart(appId, actor), ct: ct); |
|||
} |
|||
|
|||
public async Task StartRestoreAsync(RefToken actor, Uri url, string? newAppName, |
|||
CancellationToken ct = default) |
|||
{ |
|||
await restoreState.LoadAsync(ct); |
|||
|
|||
restoreState.Value.Job?.EnsureCanStart(); |
|||
|
|||
await messaging.PublishAsync(new BackupRestore(actor, url, newAppName), ct: ct); |
|||
} |
|||
|
|||
public Task DeleteBackupAsync(DomainId appId, DomainId backupId, |
|||
CancellationToken ct = default) |
|||
{ |
|||
return messaging.PublishAsync(new BackupDelete(appId, backupId), ct: ct); |
|||
} |
|||
|
|||
public async Task<IRestoreJob> GetRestoreAsync( |
|||
CancellationToken ct = default) |
|||
{ |
|||
await restoreState.LoadAsync(ct); |
|||
|
|||
return restoreState.Value.Job ?? new RestoreJob(); |
|||
} |
|||
|
|||
public async Task<List<IBackupJob>> GetBackupsAsync(DomainId appId, |
|||
CancellationToken ct = default) |
|||
{ |
|||
var state = await GetStateAsync(appId, ct); |
|||
|
|||
return state.Value.Jobs.OfType<IBackupJob>().ToList(); |
|||
} |
|||
|
|||
public async Task<IBackupJob?> GetBackupAsync(DomainId appId, DomainId backupId, |
|||
CancellationToken ct = default) |
|||
{ |
|||
var state = await GetStateAsync(appId, ct); |
|||
|
|||
return state.Value.Jobs.Find(x => x.Id == backupId); |
|||
} |
|||
|
|||
private async Task<SimpleState<BackupState>> GetStateAsync(DomainId appId, |
|||
CancellationToken ct) |
|||
{ |
|||
var state = new SimpleState<BackupState>(persistenceFactoryBackup, GetType(), appId); |
|||
|
|||
await state.LoadAsync(ct); |
|||
|
|||
return state; |
|||
} |
|||
} |
|||
@ -1,88 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Squidex.Hosting; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Messaging; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup; |
|||
|
|||
public sealed class BackupWorker : |
|||
IMessageHandler<BackupRestore>, |
|||
IMessageHandler<BackupStart>, |
|||
IMessageHandler<BackupDelete>, |
|||
IMessageHandler<BackupClear>, |
|||
IInitializable |
|||
{ |
|||
private readonly Dictionary<DomainId, Task<BackupProcessor>> backupProcessors = []; |
|||
private readonly Func<DomainId, BackupProcessor> backupFactory; |
|||
private readonly RestoreProcessor restoreProcessor; |
|||
|
|||
public BackupWorker(IServiceProvider serviceProvider) |
|||
{ |
|||
var objectFactory = ActivatorUtilities.CreateFactory(typeof(BackupProcessor), [typeof(DomainId)]); |
|||
|
|||
backupFactory = key => |
|||
{ |
|||
return (BackupProcessor)objectFactory(serviceProvider, new object[] { key }); |
|||
}; |
|||
|
|||
restoreProcessor = serviceProvider.GetRequiredService<RestoreProcessor>(); |
|||
} |
|||
|
|||
public Task InitializeAsync( |
|||
CancellationToken ct) |
|||
{ |
|||
return restoreProcessor.LoadAsync(ct); |
|||
} |
|||
|
|||
public Task HandleAsync(BackupRestore message, |
|||
CancellationToken ct) |
|||
{ |
|||
return restoreProcessor.RestoreAsync(message.Url, message.Actor, message.NewAppName, ct); |
|||
} |
|||
|
|||
public async Task HandleAsync(BackupStart message, |
|||
CancellationToken ct) |
|||
{ |
|||
var processor = await GetBackupProcessorAsync(message.AppId); |
|||
|
|||
await processor.BackupAsync(message.Actor, ct); |
|||
} |
|||
|
|||
public async Task HandleAsync(BackupDelete message, |
|||
CancellationToken ct) |
|||
{ |
|||
var processor = await GetBackupProcessorAsync(message.AppId); |
|||
|
|||
await processor.DeleteAsync(message.Id); |
|||
} |
|||
|
|||
public async Task HandleAsync(BackupClear message, |
|||
CancellationToken ct) |
|||
{ |
|||
var processor = await GetBackupProcessorAsync(message.AppId); |
|||
|
|||
await processor.ClearAsync(); |
|||
} |
|||
|
|||
private Task<BackupProcessor> GetBackupProcessorAsync(DomainId appId) |
|||
{ |
|||
lock (backupProcessors) |
|||
{ |
|||
return backupProcessors.GetOrAdd(appId, async key => |
|||
{ |
|||
var processor = backupFactory(key); |
|||
|
|||
await processor.LoadAsync(default); |
|||
|
|||
return processor; |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
@ -1,26 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using NodaTime; |
|||
using Squidex.Infrastructure; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup; |
|||
|
|||
public interface IBackupJob |
|||
{ |
|||
DomainId Id { get; } |
|||
|
|||
Instant Started { get; } |
|||
|
|||
Instant? Stopped { get; } |
|||
|
|||
int HandledEvents { get; } |
|||
|
|||
int HandledAssets { get; } |
|||
|
|||
JobStatus Status { get; } |
|||
} |
|||
@ -0,0 +1,381 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Runtime.CompilerServices; |
|||
using Microsoft.Extensions.Logging; |
|||
using Squidex.Domain.Apps.Core.Apps; |
|||
using Squidex.Domain.Apps.Entities.Apps.Commands; |
|||
using Squidex.Domain.Apps.Entities.Jobs; |
|||
using Squidex.Domain.Apps.Events; |
|||
using Squidex.Domain.Apps.Events.Apps; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Commands; |
|||
using Squidex.Infrastructure.EventSourcing; |
|||
using Squidex.Infrastructure.States; |
|||
using Squidex.Infrastructure.Tasks; |
|||
using Squidex.Infrastructure.Translations; |
|||
using Squidex.Shared.Users; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup; |
|||
|
|||
public sealed class RestoreJob : IJobRunner |
|||
{ |
|||
public const string TaskName = "restore"; |
|||
public const string ArgUrl = "url"; |
|||
public const string ArgName = "name"; |
|||
|
|||
private readonly IBackupArchiveLocation backupArchiveLocation; |
|||
private readonly IBackupHandlerFactory backupHandlerFactory; |
|||
private readonly ICommandBus commandBus; |
|||
private readonly IEventFormatter eventFormatter; |
|||
private readonly IEventStore eventStore; |
|||
private readonly IEventStreamNames eventStreamNames; |
|||
private readonly IUserResolver userResolver; |
|||
private readonly ILogger<RestoreJob> log; |
|||
|
|||
// Use a run to store all state that is necessary for a single run.
|
|||
private sealed class State |
|||
{ |
|||
public NamedId<DomainId> AppId { get; set; } |
|||
|
|||
public IEnumerable<IBackupHandler> Handlers { get; init; } |
|||
|
|||
public IBackupReader Reader { get; set; } |
|||
|
|||
public RestoreContext Context { get; set; } |
|||
|
|||
public StreamMapper StreamMapper { get; set; } |
|||
|
|||
public string? NewAppName { get; init; } |
|||
|
|||
public Uri Url { get; internal set; } |
|||
} |
|||
|
|||
public string Name => TaskName; |
|||
|
|||
public RestoreJob( |
|||
IBackupArchiveLocation backupArchiveLocation, |
|||
IBackupHandlerFactory backupHandlerFactory, |
|||
ICommandBus commandBus, |
|||
IEventFormatter eventFormatter, |
|||
IEventStore eventStore, |
|||
IEventStreamNames eventStreamNames, |
|||
IUserResolver userResolver, |
|||
ILogger<RestoreJob> log) |
|||
{ |
|||
this.backupArchiveLocation = backupArchiveLocation; |
|||
this.backupHandlerFactory = backupHandlerFactory; |
|||
this.commandBus = commandBus; |
|||
this.eventFormatter = eventFormatter; |
|||
this.eventStore = eventStore; |
|||
this.eventStreamNames = eventStreamNames; |
|||
this.userResolver = userResolver; |
|||
this.log = log; |
|||
} |
|||
|
|||
public static JobRequest BuildRequest(RefToken actor, Uri url, string? appName) |
|||
{ |
|||
return JobRequest.Create( |
|||
actor, |
|||
TaskName, |
|||
new Dictionary<string, string> |
|||
{ |
|||
[ArgUrl] = url.ToString(), |
|||
[ArgName] = appName ?? string.Empty |
|||
}); |
|||
} |
|||
|
|||
public async Task RunAsync(JobRunContext context, |
|||
CancellationToken ct) |
|||
{ |
|||
if (!context.Job.Arguments.TryGetValue(ArgUrl, out var urlValue) || !Uri.TryCreate(urlValue, UriKind.Absolute, out var url)) |
|||
{ |
|||
throw new DomainException("Argument missing."); |
|||
} |
|||
|
|||
var state = new State |
|||
{ |
|||
Handlers = backupHandlerFactory.CreateMany(), |
|||
// Required argument.
|
|||
Url = url, |
|||
// Optional argument.
|
|||
NewAppName = context.Job.Arguments.GetValueOrDefault(ArgName) |
|||
}; |
|||
|
|||
// Use a readable name to describe the job.
|
|||
context.Job.Description = T.Get("job.restore"); |
|||
|
|||
try |
|||
{ |
|||
await context.LogAsync("Started. The restore process has the following steps:"); |
|||
await context.LogAsync(" * Download backup"); |
|||
await context.LogAsync(" * Restore events and attachments."); |
|||
await context.LogAsync(" * Restore all objects like app, schemas and contents"); |
|||
await context.LogAsync(" * Complete the restore operation for all objects"); |
|||
await context.FlushAsync(); |
|||
|
|||
log.LogInformation("Backup with job id {backupId} with from URL '{url}' started.", context.Job.Id, state.Url); |
|||
|
|||
state.Reader = await DownloadAsync(context, state, ct); |
|||
|
|||
await state.Reader.CheckCompatibilityAsync(); |
|||
|
|||
using (Telemetry.Activities.StartActivity("ReadEvents")) |
|||
{ |
|||
await ReadEventsAsync(context, state, ct); |
|||
} |
|||
|
|||
if (state.Context == null) |
|||
{ |
|||
throw new BackupRestoreException("Backup has no event."); |
|||
} |
|||
|
|||
foreach (var handler in state.Handlers) |
|||
{ |
|||
using (Telemetry.Activities.StartActivity($"{handler.GetType().Name}/RestoreAsync")) |
|||
{ |
|||
await handler.RestoreAsync(state.Context, ct); |
|||
} |
|||
|
|||
await context.LogAsync($"Restored {handler.Name}"); |
|||
} |
|||
|
|||
foreach (var handler in state.Handlers) |
|||
{ |
|||
using (Telemetry.Activities.StartActivity($"{handler.GetType().Name}/CompleteRestoreAsync")) |
|||
{ |
|||
await handler.CompleteRestoreAsync(state.Context, state.NewAppName!); |
|||
} |
|||
|
|||
await context.LogAsync($"Completed {handler.Name}"); |
|||
} |
|||
|
|||
// Add the current user to the app, so that the admin can see it and verify integrity.
|
|||
await AssignContributorAsync(context, state); |
|||
|
|||
await context.LogAsync("Completed, Yeah!"); |
|||
|
|||
log.LogInformation("Backup with job id {backupId} from URL '{url}' completed.", context.Job.Id, state.Url); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
// Cleanup as soon as possible.
|
|||
await CleanupAsync(state); |
|||
|
|||
var message = "Failed with internal error."; |
|||
|
|||
switch (ex) |
|||
{ |
|||
case BackupRestoreException backupException: |
|||
message = backupException.Message; |
|||
break; |
|||
case FileNotFoundException fileNotFoundException: |
|||
message = fileNotFoundException.Message; |
|||
break; |
|||
} |
|||
|
|||
await context.LogAsync(message); |
|||
|
|||
log.LogError(ex, "Backup with job id {backupId} from URL '{url}' failed.", context.Job.Id, state.Url); |
|||
throw; |
|||
} |
|||
} |
|||
|
|||
private async Task AssignContributorAsync(JobRunContext run, State state) |
|||
{ |
|||
if (run.Actor?.IsUser != true) |
|||
{ |
|||
await run.LogAsync("Current user not assigned because restore was triggered by client."); |
|||
return; |
|||
} |
|||
|
|||
try |
|||
{ |
|||
// Add the current user to the app, so that the admin can see it and verify integrity.
|
|||
await PublishAsync(run, state, new AssignContributor |
|||
{ |
|||
ContributorId = run.Actor.Identifier, |
|||
IgnoreActor = true, |
|||
IgnorePlans = true, |
|||
Role = Role.Owner |
|||
}); |
|||
|
|||
await run.LogAsync("Assigned current user."); |
|||
} |
|||
catch (DomainException ex) |
|||
{ |
|||
await run.LogAsync($"Failed to assign contributor: {ex.Message}"); |
|||
} |
|||
} |
|||
|
|||
private Task<CommandContext> PublishAsync(JobRunContext run, State state, AppCommand command) |
|||
{ |
|||
command.Actor = run.Actor; |
|||
|
|||
if (command is IAppCommand appCommand) |
|||
{ |
|||
appCommand.AppId = state.AppId; |
|||
} |
|||
|
|||
return commandBus.PublishAsync(command, default); |
|||
} |
|||
|
|||
private async Task CleanupAsync(State state) |
|||
{ |
|||
if (state.AppId == null) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
foreach (var handler in state.Handlers) |
|||
{ |
|||
try |
|||
{ |
|||
await handler.CleanupRestoreErrorAsync(state.AppId.Id); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, "Failed to clean up restore."); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task<IBackupReader> DownloadAsync(JobRunContext run, State state, |
|||
CancellationToken ct) |
|||
{ |
|||
using (Telemetry.Activities.StartActivity("Download")) |
|||
{ |
|||
await run.LogAsync("Downloading Backup"); |
|||
|
|||
var reader = await backupArchiveLocation.OpenReaderAsync(state.Url, run.Job.Id, ct); |
|||
|
|||
await run.LogAsync("Downloaded Backup"); |
|||
|
|||
return reader; |
|||
} |
|||
} |
|||
|
|||
private async Task ReadEventsAsync(JobRunContext run, State state, |
|||
CancellationToken ct) |
|||
{ |
|||
// Run batch first, because it is cheaper as it has less items.
|
|||
var events = HandleEventsAsync(run, state, ct).Batch(100, ct).Buffered(2, ct); |
|||
|
|||
var handled = 0; |
|||
|
|||
await Parallel.ForEachAsync(events, new ParallelOptions |
|||
{ |
|||
CancellationToken = ct, |
|||
// The event store cannot insert events in parallel.
|
|||
MaxDegreeOfParallelism = 1, |
|||
}, |
|||
async (batch, ct) => |
|||
{ |
|||
var commits = |
|||
batch.Select(item => |
|||
EventCommit.Create( |
|||
item.Stream, |
|||
item.Offset, |
|||
item.Event, |
|||
eventFormatter)); |
|||
|
|||
await eventStore.AppendUnsafeAsync(commits, ct); |
|||
|
|||
// Just in case we use parallel inserts later.
|
|||
Interlocked.Increment(ref handled); |
|||
|
|||
await run.LogAsync($"Reading {state.Reader.ReadEvents}/{handled} events and {state.Reader.ReadAttachments} attachments completed.", true); |
|||
}); |
|||
} |
|||
|
|||
private async IAsyncEnumerable<(string Stream, long Offset, Envelope<IEvent> Event)> HandleEventsAsync(JobRunContext run, State state, |
|||
[EnumeratorCancellation] CancellationToken ct) |
|||
{ |
|||
var @events = state.Reader.ReadEventsAsync(eventStreamNames, eventFormatter, ct); |
|||
|
|||
await foreach (var (stream, @event) in events.WithCancellation(ct)) |
|||
{ |
|||
var (newStream, handled) = await HandleEventAsync(run, state, stream, @event, ct); |
|||
|
|||
if (handled) |
|||
{ |
|||
var offset = state.StreamMapper.GetStreamOffset(newStream); |
|||
|
|||
yield return (newStream, offset, @event); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task<(string StreamName, bool Handled)> HandleEventAsync(JobRunContext run, State state, string stream, Envelope<IEvent> @event, |
|||
CancellationToken ct = default) |
|||
{ |
|||
if (@event.Payload is AppCreated appCreated) |
|||
{ |
|||
var previousAppId = appCreated.AppId.Id; |
|||
|
|||
if (!string.IsNullOrWhiteSpace(state.NewAppName)) |
|||
{ |
|||
appCreated.Name = state.NewAppName; |
|||
|
|||
state.AppId = NamedId.Of(DomainId.NewGuid(), state.NewAppName); |
|||
} |
|||
else |
|||
{ |
|||
state.AppId = NamedId.Of(DomainId.NewGuid(), appCreated.Name); |
|||
} |
|||
|
|||
await CreateContextAsync(run, state, previousAppId, ct); |
|||
|
|||
state.StreamMapper = new StreamMapper(state.Context); |
|||
} |
|||
|
|||
if (@event.Payload is SquidexEvent { Actor: { } } squidexEvent) |
|||
{ |
|||
if (state.Context.UserMapping.TryMap(squidexEvent.Actor, out var newUser)) |
|||
{ |
|||
squidexEvent.Actor = newUser; |
|||
} |
|||
} |
|||
|
|||
if (@event.Payload is AppEvent appEvent) |
|||
{ |
|||
appEvent.AppId = state.AppId; |
|||
} |
|||
|
|||
var (newStream, id) = state.StreamMapper.Map(stream); |
|||
|
|||
@event.SetAggregateId(id); |
|||
@event.SetRestored(); |
|||
|
|||
foreach (var handler in state.Handlers) |
|||
{ |
|||
if (!await handler.RestoreEventAsync(@event, state.Context, ct)) |
|||
{ |
|||
return (newStream, false); |
|||
} |
|||
} |
|||
|
|||
return (newStream, true); |
|||
} |
|||
|
|||
private async Task CreateContextAsync(JobRunContext run, State state, DomainId previousAppId, |
|||
CancellationToken ct) |
|||
{ |
|||
var userMapping = new UserMapping(run.Actor); |
|||
|
|||
using (Telemetry.Activities.StartActivity("CreateUsers")) |
|||
{ |
|||
await run.LogAsync("Creating Users"); |
|||
|
|||
await userMapping.RestoreAsync(state.Reader, userResolver, ct); |
|||
|
|||
await run.LogAsync("Created Users"); |
|||
} |
|||
|
|||
state.Context = new RestoreContext(state.AppId.Id, userMapping, state.Reader, previousAppId); |
|||
} |
|||
} |
|||
@ -1,57 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Domain.Apps.Entities.Backup.State; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup; |
|||
|
|||
public sealed partial class RestoreProcessor |
|||
{ |
|||
// Use a run to store all state that is necessary for a single run.
|
|||
private sealed class Run : IDisposable |
|||
{ |
|||
private readonly CancellationTokenSource cancellationSource = new CancellationTokenSource(); |
|||
private readonly CancellationTokenSource cancellationLinked; |
|||
|
|||
public IEnumerable<IBackupHandler> Handlers { get; init; } |
|||
|
|||
public IBackupReader Reader { get; set; } |
|||
|
|||
public RestoreJob Job { get; init; } |
|||
|
|||
public RestoreContext Context { get; set; } |
|||
|
|||
public StreamMapper StreamMapper { get; set; } |
|||
|
|||
public CancellationToken CancellationToken => cancellationLinked.Token; |
|||
|
|||
public Run(CancellationToken ct) |
|||
{ |
|||
cancellationLinked = CancellationTokenSource.CreateLinkedTokenSource(ct, cancellationSource.Token); |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
Reader?.Dispose(); |
|||
|
|||
cancellationSource.Dispose(); |
|||
cancellationLinked.Dispose(); |
|||
} |
|||
|
|||
public void Cancel() |
|||
{ |
|||
try |
|||
{ |
|||
cancellationSource.Cancel(); |
|||
} |
|||
catch (ObjectDisposedException) |
|||
{ |
|||
// Cancellation token might have been disposed, if the run is completed.
|
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -1,445 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Runtime.CompilerServices; |
|||
using Microsoft.Extensions.Logging; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Core.Apps; |
|||
using Squidex.Domain.Apps.Entities.Apps.Commands; |
|||
using Squidex.Domain.Apps.Entities.Backup.State; |
|||
using Squidex.Domain.Apps.Events; |
|||
using Squidex.Domain.Apps.Events.Apps; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Commands; |
|||
using Squidex.Infrastructure.EventSourcing; |
|||
using Squidex.Infrastructure.States; |
|||
using Squidex.Infrastructure.Tasks; |
|||
using Squidex.Infrastructure.Translations; |
|||
using Squidex.Shared.Users; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup; |
|||
|
|||
public sealed partial class RestoreProcessor |
|||
{ |
|||
private readonly IBackupArchiveLocation backupArchiveLocation; |
|||
private readonly IBackupHandlerFactory backupHandlerFactory; |
|||
private readonly ICommandBus commandBus; |
|||
private readonly IEventFormatter eventFormatter; |
|||
private readonly IEventStore eventStore; |
|||
private readonly IEventStreamNames eventStreamNames; |
|||
private readonly IUserResolver userResolver; |
|||
private readonly ILogger<RestoreProcessor> log; |
|||
private readonly ReentrantScheduler scheduler = new ReentrantScheduler(1); |
|||
private readonly SimpleState<BackupRestoreState> state; |
|||
private Run? currentRun; |
|||
|
|||
public IClock Clock { get; set; } = SystemClock.Instance; |
|||
|
|||
public RestoreProcessor( |
|||
IBackupArchiveLocation backupArchiveLocation, |
|||
IBackupHandlerFactory backupHandlerFactory, |
|||
ICommandBus commandBus, |
|||
IEventFormatter eventFormatter, |
|||
IEventStore eventStore, |
|||
IEventStreamNames eventStreamNames, |
|||
IPersistenceFactory<BackupRestoreState> persistenceFactory, |
|||
IUserResolver userResolver, |
|||
ILogger<RestoreProcessor> log) |
|||
{ |
|||
this.backupArchiveLocation = backupArchiveLocation; |
|||
this.backupHandlerFactory = backupHandlerFactory; |
|||
this.commandBus = commandBus; |
|||
this.eventFormatter = eventFormatter; |
|||
this.eventStore = eventStore; |
|||
this.eventStreamNames = eventStreamNames; |
|||
this.userResolver = userResolver; |
|||
this.log = log; |
|||
|
|||
// Enable locking for the parallel operations that might write stuff.
|
|||
state = new SimpleState<BackupRestoreState>(persistenceFactory, GetType(), "Default", true); |
|||
} |
|||
|
|||
public async Task LoadAsync( |
|||
CancellationToken ct) |
|||
{ |
|||
await state.LoadAsync(ct); |
|||
|
|||
if (state.Value.Job?.Status == JobStatus.Started) |
|||
{ |
|||
state.Value.Job.Status = JobStatus.Failed; |
|||
|
|||
await state.WriteAsync(ct); |
|||
} |
|||
} |
|||
|
|||
public Task RestoreAsync(Uri url, RefToken actor, string? newAppName, |
|||
CancellationToken ct) |
|||
{ |
|||
Guard.NotNull(url); |
|||
Guard.NotNull(actor); |
|||
|
|||
if (!string.IsNullOrWhiteSpace(newAppName)) |
|||
{ |
|||
Guard.ValidSlug(newAppName); |
|||
} |
|||
|
|||
return scheduler.ScheduleAsync(async ct => |
|||
{ |
|||
if (currentRun != null) |
|||
{ |
|||
throw new DomainException(T.Get("backups.restoreRunning")); |
|||
} |
|||
|
|||
state.Value.Job?.EnsureCanStart(); |
|||
|
|||
// Set the current run first to indicate that we are running a rule at the moment.
|
|||
var run = currentRun = new Run(ct) |
|||
{ |
|||
Job = new RestoreJob |
|||
{ |
|||
Id = DomainId.NewGuid(), |
|||
NewAppName = newAppName, |
|||
Actor = actor, |
|||
Started = Clock.GetCurrentInstant(), |
|||
Status = JobStatus.Started, |
|||
Url = url |
|||
}, |
|||
Handlers = backupHandlerFactory.CreateMany() |
|||
}; |
|||
|
|||
state.Value.Job = run.Job; |
|||
try |
|||
{ |
|||
await ProcessAsync(run, run.CancellationToken); |
|||
} |
|||
finally |
|||
{ |
|||
// Unset the run to indicate that we are done.
|
|||
currentRun.Dispose(); |
|||
currentRun = null; |
|||
} |
|||
}, ct); |
|||
} |
|||
|
|||
private async Task ProcessAsync(Run run, |
|||
CancellationToken ct) |
|||
{ |
|||
using (Telemetry.Activities.StartActivity("RestoreBackup")) |
|||
{ |
|||
try |
|||
{ |
|||
await state.WriteAsync(run.CancellationToken); |
|||
|
|||
await LogAsync(run, "Started. The restore process has the following steps:"); |
|||
await LogAsync(run, " * Download backup"); |
|||
await LogAsync(run, " * Restore events and attachments."); |
|||
await LogAsync(run, " * Restore all objects like app, schemas and contents"); |
|||
await LogAsync(run, " * Complete the restore operation for all objects"); |
|||
await LogFlushAsync(run); |
|||
|
|||
log.LogInformation("Backup with job id {backupId} with from URL '{url}' started.", run.Job.Id, run.Job.Url); |
|||
|
|||
run.Reader = await DownloadAsync(run, ct); |
|||
|
|||
await run.Reader.CheckCompatibilityAsync(); |
|||
|
|||
using (Telemetry.Activities.StartActivity("ReadEvents")) |
|||
{ |
|||
await ReadEventsAsync(run, ct); |
|||
} |
|||
|
|||
if (run.Context == null) |
|||
{ |
|||
throw new BackupRestoreException("Backup has no event."); |
|||
} |
|||
|
|||
foreach (var handler in run.Handlers) |
|||
{ |
|||
using (Telemetry.Activities.StartActivity($"{handler.GetType().Name}/RestoreAsync")) |
|||
{ |
|||
await handler.RestoreAsync(run.Context, ct); |
|||
} |
|||
|
|||
await LogAsync(run, $"Restored {handler.Name}"); |
|||
} |
|||
|
|||
foreach (var handler in run.Handlers) |
|||
{ |
|||
using (Telemetry.Activities.StartActivity($"{handler.GetType().Name}/CompleteRestoreAsync")) |
|||
{ |
|||
await handler.CompleteRestoreAsync(run.Context, run.Job.NewAppName!); |
|||
} |
|||
|
|||
await LogAsync(run, $"Completed {handler.Name}"); |
|||
} |
|||
|
|||
// Add the current user to the app, so that the admin can see it and verify integrity.
|
|||
await AssignContributorAsync(run); |
|||
|
|||
await SetStatusAsync(run, JobStatus.Completed, "Completed, Yeah!"); |
|||
|
|||
log.LogInformation("Backup with job id {backupId} from URL '{url}' completed.", run.Job.Id, run.Job.Url); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
// Cleanup as soon as possible.
|
|||
await CleanupAsync(run); |
|||
|
|||
var message = "Failed with internal error."; |
|||
|
|||
switch (ex) |
|||
{ |
|||
case BackupRestoreException backupException: |
|||
message = backupException.Message; |
|||
break; |
|||
case FileNotFoundException fileNotFoundException: |
|||
message = fileNotFoundException.Message; |
|||
break; |
|||
} |
|||
|
|||
await SetStatusAsync(run, JobStatus.Failed, message); |
|||
|
|||
log.LogError(ex, "Backup with job id {backupId} from URL '{url}' failed.", run.Job.Id, run.Job.Url); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task AssignContributorAsync(Run run) |
|||
{ |
|||
if (run.Job.Actor?.IsUser != true) |
|||
{ |
|||
await LogAsync(run, "Current user not assigned because restore was triggered by client."); |
|||
return; |
|||
} |
|||
|
|||
try |
|||
{ |
|||
// Add the current user to the app, so that the admin can see it and verify integrity.
|
|||
await PublishAsync(run, new AssignContributor |
|||
{ |
|||
ContributorId = run.Job.Actor.Identifier, |
|||
IgnoreActor = true, |
|||
IgnorePlans = true, |
|||
Role = Role.Owner |
|||
}); |
|||
|
|||
await LogAsync(run, "Assigned current user."); |
|||
} |
|||
catch (DomainException ex) |
|||
{ |
|||
await LogAsync(run, $"Failed to assign contributor: {ex.Message}"); |
|||
} |
|||
} |
|||
|
|||
private Task PublishAsync(Run run, AppCommand command) |
|||
{ |
|||
command.Actor = run.Job.Actor; |
|||
|
|||
if (command is IAppCommand appCommand) |
|||
{ |
|||
appCommand.AppId = run.Job.AppId; |
|||
} |
|||
|
|||
return commandBus.PublishAsync(command, default); |
|||
} |
|||
|
|||
private async Task CleanupAsync(Run run) |
|||
{ |
|||
if (run.Job.AppId == null) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
foreach (var handler in run.Handlers) |
|||
{ |
|||
try |
|||
{ |
|||
await handler.CleanupRestoreErrorAsync(run.Job.AppId.Id); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, "Failed to clean up restore."); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task<IBackupReader> DownloadAsync(Run run, |
|||
CancellationToken ct) |
|||
{ |
|||
using (Telemetry.Activities.StartActivity("Download")) |
|||
{ |
|||
await LogAsync(run, "Downloading Backup"); |
|||
|
|||
var reader = await backupArchiveLocation.OpenReaderAsync(run.Job.Url, run.Job.Id, ct); |
|||
|
|||
await LogAsync(run, "Downloaded Backup"); |
|||
|
|||
return reader; |
|||
} |
|||
} |
|||
|
|||
private async Task ReadEventsAsync(Run run, |
|||
CancellationToken ct) |
|||
{ |
|||
// Run batch first, because it is cheaper as it has less items.
|
|||
var events = HandleEventsAsync(run, ct).Batch(100, ct).Buffered(2, ct); |
|||
|
|||
var handled = 0; |
|||
|
|||
await Parallel.ForEachAsync(events, new ParallelOptions |
|||
{ |
|||
CancellationToken = ct, |
|||
// The event store cannot insert events in parallel.
|
|||
MaxDegreeOfParallelism = 1, |
|||
}, |
|||
async (batch, ct) => |
|||
{ |
|||
var commits = |
|||
batch.Select(item => |
|||
EventCommit.Create( |
|||
item.Stream, |
|||
item.Offset, |
|||
item.Event, |
|||
eventFormatter)); |
|||
|
|||
await eventStore.AppendUnsafeAsync(commits, ct); |
|||
|
|||
// Just in case we use parallel inserts later.
|
|||
Interlocked.Increment(ref handled); |
|||
|
|||
await LogAsync(run, $"Reading {run.Reader.ReadEvents}/{handled} events and {run.Reader.ReadAttachments} attachments completed.", true); |
|||
}); |
|||
} |
|||
|
|||
private async IAsyncEnumerable<(string Stream, long Offset, Envelope<IEvent> Event)> HandleEventsAsync(Run run, |
|||
[EnumeratorCancellation] CancellationToken ct) |
|||
{ |
|||
var @events = run.Reader.ReadEventsAsync(eventStreamNames, eventFormatter, ct); |
|||
|
|||
await foreach (var (stream, @event) in events.WithCancellation(ct)) |
|||
{ |
|||
var (newStream, handled) = await HandleEventAsync(run, stream, @event, ct); |
|||
|
|||
if (handled) |
|||
{ |
|||
var offset = run.StreamMapper.GetStreamOffset(newStream); |
|||
|
|||
yield return (newStream, offset, @event); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task<(string StreamName, bool Handled)> HandleEventAsync(Run run, string stream, Envelope<IEvent> @event, |
|||
CancellationToken ct = default) |
|||
{ |
|||
if (@event.Payload is AppCreated appCreated) |
|||
{ |
|||
var previousAppId = appCreated.AppId.Id; |
|||
|
|||
if (!string.IsNullOrWhiteSpace(run.Job.NewAppName)) |
|||
{ |
|||
appCreated.Name = run.Job.NewAppName; |
|||
|
|||
run.Job.AppId = NamedId.Of(DomainId.NewGuid(), run.Job.NewAppName); |
|||
} |
|||
else |
|||
{ |
|||
run.Job.AppId = NamedId.Of(DomainId.NewGuid(), appCreated.Name); |
|||
} |
|||
|
|||
await CreateContextAsync(run, previousAppId, ct); |
|||
|
|||
run.StreamMapper = new StreamMapper(run.Context); |
|||
} |
|||
|
|||
if (@event.Payload is SquidexEvent { Actor: { } } squidexEvent) |
|||
{ |
|||
if (run.Context.UserMapping.TryMap(squidexEvent.Actor, out var newUser)) |
|||
{ |
|||
squidexEvent.Actor = newUser; |
|||
} |
|||
} |
|||
|
|||
if (@event.Payload is AppEvent appEvent) |
|||
{ |
|||
appEvent.AppId = run.Job.AppId; |
|||
} |
|||
|
|||
var (newStream, id) = run.StreamMapper.Map(stream); |
|||
|
|||
@event.SetAggregateId(id); |
|||
@event.SetRestored(); |
|||
|
|||
foreach (var handler in run.Handlers) |
|||
{ |
|||
if (!await handler.RestoreEventAsync(@event, run.Context, ct)) |
|||
{ |
|||
return (newStream, false); |
|||
} |
|||
} |
|||
|
|||
return (newStream, true); |
|||
} |
|||
|
|||
private async Task CreateContextAsync(Run run, DomainId previousAppId, |
|||
CancellationToken ct) |
|||
{ |
|||
var userMapping = new UserMapping(run.Job.Actor); |
|||
|
|||
using (Telemetry.Activities.StartActivity("CreateUsers")) |
|||
{ |
|||
await LogAsync(run, "Creating Users"); |
|||
|
|||
await userMapping.RestoreAsync(run.Reader, userResolver, ct); |
|||
|
|||
await LogAsync(run, "Created Users"); |
|||
} |
|||
|
|||
run.Context = new RestoreContext(run.Job.AppId.Id, userMapping, run.Reader, previousAppId); |
|||
} |
|||
|
|||
private Task SetStatusAsync(Run run, JobStatus status, string message) |
|||
{ |
|||
var now = Clock.GetCurrentInstant(); |
|||
|
|||
run.Job.Status = status; |
|||
|
|||
if (status == JobStatus.Failed || status == JobStatus.Completed) |
|||
{ |
|||
run.Job.Stopped = now; |
|||
} |
|||
else if (status == JobStatus.Started) |
|||
{ |
|||
run.Job.Started = now; |
|||
} |
|||
|
|||
run.Job.Log.Add($"{now}: {message}"); |
|||
|
|||
return state.WriteAsync(default); |
|||
} |
|||
|
|||
private Task LogAsync(Run run, string message, bool replace = false) |
|||
{ |
|||
var now = Clock.GetCurrentInstant(); |
|||
|
|||
if (replace && run.Job.Log.Count > 0) |
|||
{ |
|||
run.Job.Log[^1] = $"{now}: {message}"; |
|||
} |
|||
else |
|||
{ |
|||
run.Job.Log.Add($"{now}: {message}"); |
|||
} |
|||
|
|||
return state.WriteAsync(100, run.CancellationToken); |
|||
} |
|||
|
|||
private Task LogFlushAsync(Run run) |
|||
{ |
|||
return state.WriteAsync(run.CancellationToken); |
|||
} |
|||
} |
|||
@ -1,29 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Translations; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup.State; |
|||
|
|||
public sealed class BackupState |
|||
{ |
|||
public List<BackupJob> Jobs { get; set; } = []; |
|||
|
|||
public void EnsureCanStart() |
|||
{ |
|||
if (Jobs.Exists(x => x.Status == JobStatus.Started)) |
|||
{ |
|||
throw new DomainException(T.Get("backups.alreadyRunning")); |
|||
} |
|||
|
|||
if (Jobs.Count >= 10) |
|||
{ |
|||
throw new DomainException(T.Get("backups.maxReached", new { max = 10 })); |
|||
} |
|||
} |
|||
} |
|||
@ -1,43 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using NodaTime; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Translations; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup.State; |
|||
|
|||
public sealed class RestoreJob : IRestoreJob |
|||
{ |
|||
public string AppName { get; set; } |
|||
|
|||
public DomainId Id { get; set; } |
|||
|
|||
public NamedId<DomainId> AppId { get; set; } |
|||
|
|||
public RefToken Actor { get; set; } |
|||
|
|||
public Uri Url { get; set; } |
|||
|
|||
public Instant Started { get; set; } |
|||
|
|||
public Instant? Stopped { get; set; } |
|||
|
|||
public List<string> Log { get; set; } = []; |
|||
|
|||
public JobStatus Status { get; set; } |
|||
|
|||
public string? NewAppName { get; set; } |
|||
|
|||
public void EnsureCanStart() |
|||
{ |
|||
if (Status == JobStatus.Started) |
|||
{ |
|||
throw new DomainException(T.Get("backups.restoreRunning")); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,93 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Domain.Apps.Core.Apps; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.States; |
|||
using Squidex.Infrastructure.Translations; |
|||
using Squidex.Messaging; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
public sealed class DefaultJobService : IJobService, IDeleter |
|||
{ |
|||
private readonly IMessageBus messaging; |
|||
private readonly IEnumerable<IJobRunner> runners; |
|||
private readonly IPersistenceFactory<JobsState> persistence; |
|||
|
|||
public DefaultJobService(IMessageBus messaging, IEnumerable<IJobRunner> runners, IPersistenceFactory<JobsState> persistence) |
|||
{ |
|||
this.messaging = messaging; |
|||
this.runners = runners; |
|||
this.persistence = persistence; |
|||
} |
|||
|
|||
Task IDeleter.DeleteAppAsync(App app, CancellationToken ct) |
|||
{ |
|||
return messaging.PublishAsync(new JobClear(app.Id), null, ct); |
|||
} |
|||
|
|||
public async Task DownloadAsync(Job job, Stream stream, |
|||
CancellationToken ct = default) |
|||
{ |
|||
Guard.NotNull(job); |
|||
Guard.NotNull(stream); |
|||
|
|||
if (job.File == null || job.Status != JobStatus.Completed) |
|||
{ |
|||
throw new InvalidOperationException("Invalid job."); |
|||
} |
|||
|
|||
var runner = runners.FirstOrDefault(x => x.Name == job.TaskName) ?? |
|||
throw new InvalidOperationException("Invalid job."); |
|||
|
|||
await runner.DownloadAsync(job, stream, ct); |
|||
} |
|||
|
|||
public async Task StartAsync(DomainId ownerId, JobRequest request, |
|||
CancellationToken ct = default) |
|||
{ |
|||
var runner = runners.FirstOrDefault(x => x.Name == request.TaskName) ?? |
|||
throw new DomainException(T.Get("jobs.invalidTaskName")); |
|||
|
|||
var state = await GetStateAsync(ownerId, ct); |
|||
|
|||
state.EnsureCanStart(runner); |
|||
|
|||
await messaging.PublishAsync(new JobStart(ownerId, request), null, ct); |
|||
} |
|||
|
|||
public Task CancelAsync(DomainId ownerId, string? taskName = null, |
|||
CancellationToken ct = default) |
|||
{ |
|||
return messaging.PublishAsync(new JobCancel(ownerId, taskName), null, ct); |
|||
} |
|||
|
|||
public Task DeleteJobAsync(DomainId ownerId, DomainId jobId, |
|||
CancellationToken ct = default) |
|||
{ |
|||
return messaging.PublishAsync(new JobDelete(ownerId, jobId), null, ct); |
|||
} |
|||
|
|||
public async Task<List<Job>> GetJobsAsync(DomainId ownerId, |
|||
CancellationToken ct = default) |
|||
{ |
|||
var state = await GetStateAsync(ownerId, ct); |
|||
|
|||
return state.Jobs; |
|||
} |
|||
|
|||
private async Task<JobsState> GetStateAsync(DomainId ownerId, |
|||
CancellationToken ct = default) |
|||
{ |
|||
var state = new SimpleState<JobsState>(persistence, typeof(JobProcessor), ownerId); |
|||
|
|||
await state.LoadAsync(ct); |
|||
|
|||
return state.Value; |
|||
} |
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
public interface IJobRunner |
|||
{ |
|||
static string TaskName { get; } |
|||
|
|||
string Name { get; } |
|||
|
|||
int MaxJobs => 3; |
|||
|
|||
Task RunAsync(JobRunContext context, |
|||
CancellationToken ct); |
|||
|
|||
Task DownloadAsync(Job job, Stream stream, |
|||
CancellationToken ct) |
|||
{ |
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
Task CleanupAsync(Job job) |
|||
{ |
|||
return Task.CompletedTask; |
|||
} |
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using NodaTime; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Collections; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
public sealed class Job |
|||
{ |
|||
public DomainId Id { get; init; } |
|||
|
|||
public Instant Started { get; set; } |
|||
|
|||
public Instant? Stopped { get; set; } |
|||
|
|||
public string TaskName { get; init; } |
|||
|
|||
public string Description { get; set; } |
|||
|
|||
public JobFile? File { get; set; } |
|||
|
|||
public ReadonlyDictionary<string, string> Arguments { get; init; } |
|||
|
|||
public List<JobLogMessage> Log { get; set; } = []; |
|||
|
|||
public JobStatus Status { get; set; } |
|||
} |
|||
@ -0,0 +1,207 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.Extensions.Logging; |
|||
using NodaTime; |
|||
using Squidex.Caching; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.States; |
|||
using Squidex.Infrastructure.Tasks; |
|||
using Squidex.Infrastructure.Translations; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
public sealed class JobProcessor |
|||
{ |
|||
private readonly DomainId ownerId; |
|||
private readonly IEnumerable<IJobRunner> runners; |
|||
private readonly ILocalCache localCache; |
|||
private readonly ILogger<JobProcessor> log; |
|||
private readonly SimpleState<JobsState> state; |
|||
private readonly ReentrantScheduler scheduler = new ReentrantScheduler(1); |
|||
private JobRunContext? currentRun; |
|||
|
|||
public IClock Clock { get; init; } = SystemClock.Instance; |
|||
|
|||
public JobProcessor(DomainId ownerId, |
|||
IEnumerable<IJobRunner> runners, |
|||
ILocalCache localCache, |
|||
IPersistenceFactory<JobsState> persistenceFactory, |
|||
ILogger<JobProcessor> log) |
|||
{ |
|||
this.ownerId = ownerId; |
|||
this.runners = runners; |
|||
this.localCache = localCache; |
|||
this.log = log; |
|||
|
|||
state = new SimpleState<JobsState>(persistenceFactory, GetType(), ownerId); |
|||
} |
|||
|
|||
public async Task LoadAsync( |
|||
CancellationToken ct) |
|||
{ |
|||
await state.LoadAsync(ct); |
|||
|
|||
if (state.Value.Jobs.RemoveAll(x => x.Stopped == null) > 0) |
|||
{ |
|||
// This should actually never happen, so we log with warning.
|
|||
log.LogWarning("Removed unfinished backups for owner {ownerId} after start.", ownerId); |
|||
|
|||
await state.WriteAsync(ct); |
|||
} |
|||
} |
|||
|
|||
public Task DeleteAsync(DomainId jobId) |
|||
{ |
|||
return scheduler.ScheduleAsync(async _ => |
|||
{ |
|||
log.LogInformation("Clearing jobs for owner {ownerId}.", ownerId); |
|||
|
|||
var job = state.Value.Jobs.Find(x => x.Id == jobId); |
|||
|
|||
if (job == null) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
var runner = runners.FirstOrDefault(x => x.Name == job.TaskName); |
|||
|
|||
if (runner != null) |
|||
{ |
|||
await runner.CleanupAsync(job); |
|||
} |
|||
|
|||
await state.UpdateAsync(state => state.Jobs.RemoveAll(x => x.Id == jobId) > 0, ct: default); |
|||
}, default); |
|||
} |
|||
|
|||
public Task ClearAsync() |
|||
{ |
|||
return scheduler.ScheduleAsync(async _ => |
|||
{ |
|||
log.LogInformation("Clearing jobs for owner {ownerId}.", ownerId); |
|||
|
|||
foreach (var job in state.Value.Jobs) |
|||
{ |
|||
var runner = runners.FirstOrDefault(x => x.Name == job.TaskName); |
|||
|
|||
if (runner != null) |
|||
{ |
|||
await runner.CleanupAsync(job); |
|||
} |
|||
} |
|||
|
|||
await state.ClearAsync(default); |
|||
}, default); |
|||
} |
|||
|
|||
public Task CancelAsync(string? taskName) |
|||
{ |
|||
// Ensure that only one thread is accessing the current state at a time.
|
|||
return scheduler.Schedule(() => |
|||
{ |
|||
if (taskName == null || currentRun?.Job.TaskName == taskName) |
|||
{ |
|||
currentRun?.Cancel(); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
public Task RunAsync(JobRequest request, |
|||
CancellationToken ct) |
|||
{ |
|||
return scheduler.ScheduleAsync(async ct => |
|||
{ |
|||
if (currentRun != null) |
|||
{ |
|||
throw new DomainException(T.Get("jobs.alreadyRunning")); |
|||
} |
|||
|
|||
var runner = runners.FirstOrDefault(x => x.Name == request.TaskName) ?? |
|||
throw new DomainException(T.Get("jobs.invalidTaskName")); |
|||
|
|||
state.Value.EnsureCanStart(runner); |
|||
|
|||
// Set the current run first to indicate that we are running a rule at the moment.
|
|||
var context = currentRun = new JobRunContext(state, Clock, ct) |
|||
{ |
|||
Actor = request.Actor, |
|||
Job = new Job |
|||
{ |
|||
Id = DomainId.NewGuid(), |
|||
Arguments = request.Arguments, |
|||
Description = request.TaskName, |
|||
Started = default, |
|||
Status = JobStatus.Created, |
|||
TaskName = request.TaskName |
|||
}, |
|||
OwnerId = ownerId |
|||
}; |
|||
|
|||
log.LogInformation("Starting new backup with backup id '{backupId}' for owner {ownerId}.", context.Job.Id, ownerId); |
|||
|
|||
state.Value.Jobs.Insert(0, context.Job); |
|||
try |
|||
{ |
|||
await ProcessAsync(context, runner, context.CancellationToken); |
|||
} |
|||
finally |
|||
{ |
|||
// Unset the run to indicate that we are done.
|
|||
currentRun.Dispose(); |
|||
currentRun = null; |
|||
} |
|||
}, ct); |
|||
} |
|||
|
|||
private async Task ProcessAsync(JobRunContext context, IJobRunner runner, |
|||
CancellationToken ct) |
|||
{ |
|||
try |
|||
{ |
|||
await SetStatusAsync(context, JobStatus.Started); |
|||
|
|||
using (localCache.StartContext()) |
|||
{ |
|||
await runner.RunAsync(context, ct); |
|||
} |
|||
|
|||
await SetStatusAsync(context, JobStatus.Completed); |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
await SetStatusAsync(context, JobStatus.Cancelled); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, "Failed to run job with ID {jobId}.", context.Job.Id); |
|||
|
|||
await SetStatusAsync(context, JobStatus.Failed); |
|||
} |
|||
} |
|||
|
|||
private Task SetStatusAsync(JobRunContext context, JobStatus status) |
|||
{ |
|||
var now = Clock.GetCurrentInstant(); |
|||
|
|||
return state.UpdateAsync(_ => |
|||
{ |
|||
context.Job.Status = status; |
|||
|
|||
if (status == JobStatus.Started) |
|||
{ |
|||
context.Job.Started = now; |
|||
} |
|||
else if (status != JobStatus.Created) |
|||
{ |
|||
context.Job.Stopped = now; |
|||
} |
|||
|
|||
return true; |
|||
}, ct: default); |
|||
} |
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Collections; |
|||
|
|||
#pragma warning disable SA1313 // Parameter names should begin with lower-case letter
|
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
public record struct JobRequest(RefToken Actor, string TaskName, ReadonlyDictionary<string, string> Arguments) |
|||
{ |
|||
public static JobRequest Create(RefToken actor, string taskName, Dictionary<string, string>? arguments = null) |
|||
{ |
|||
var args = arguments?.ToReadonlyDictionary() ?? ReadonlyDictionary.Empty<string, string>(); |
|||
|
|||
return new JobRequest(actor, taskName, args); |
|||
} |
|||
} |
|||
@ -0,0 +1,75 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using NodaTime; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.States; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
public sealed class JobRunContext : IDisposable |
|||
{ |
|||
private readonly CancellationTokenSource cancellationSource = new CancellationTokenSource(); |
|||
private readonly CancellationTokenSource cancellationLinked; |
|||
private readonly SimpleState<JobsState> state; |
|||
private readonly IClock clock; |
|||
|
|||
required public RefToken Actor { get; init; } |
|||
|
|||
required public Job Job { get; init; } |
|||
|
|||
required public DomainId OwnerId { get; init; } |
|||
|
|||
public CancellationToken CancellationToken => cancellationLinked.Token; |
|||
|
|||
public JobRunContext(SimpleState<JobsState> state, IClock clock, CancellationToken ct) |
|||
{ |
|||
this.state = state; |
|||
this.clock = clock; |
|||
|
|||
cancellationLinked = CancellationTokenSource.CreateLinkedTokenSource(ct, cancellationSource.Token); |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
cancellationSource.Dispose(); |
|||
cancellationLinked.Dispose(); |
|||
} |
|||
|
|||
public Task LogAsync(string message, bool replace = false) |
|||
{ |
|||
var item = new JobLogMessage(clock.GetCurrentInstant(), message); |
|||
|
|||
if (replace && Job.Log.Count > 0) |
|||
{ |
|||
Job.Log[^1] = item; |
|||
} |
|||
else |
|||
{ |
|||
Job.Log.Add(item); |
|||
} |
|||
|
|||
return state.WriteAsync(100, CancellationToken); |
|||
} |
|||
|
|||
public Task FlushAsync() |
|||
{ |
|||
return state.WriteAsync(CancellationToken); |
|||
} |
|||
|
|||
public void Cancel() |
|||
{ |
|||
try |
|||
{ |
|||
cancellationSource.Cancel(); |
|||
} |
|||
catch (ObjectDisposedException) |
|||
{ |
|||
// Cancellation token might have been disposed, if the run is completed.
|
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,79 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Messaging; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
public sealed class JobWorker : |
|||
IMessageHandler<JobStart>, |
|||
IMessageHandler<JobDelete>, |
|||
IMessageHandler<JobCancel>, |
|||
IMessageHandler<JobClear> |
|||
{ |
|||
private readonly Dictionary<DomainId, Task<JobProcessor>> processors = []; |
|||
private readonly Func<DomainId, JobProcessor> processorFactory; |
|||
|
|||
public JobWorker(IServiceProvider serviceProvider) |
|||
{ |
|||
var objectFactory = ActivatorUtilities.CreateFactory(typeof(JobProcessor), [typeof(DomainId)]); |
|||
|
|||
processorFactory = key => |
|||
{ |
|||
return (JobProcessor)objectFactory(serviceProvider, new object[] { key }); |
|||
}; |
|||
} |
|||
|
|||
public async Task HandleAsync(JobStart message, |
|||
CancellationToken ct) |
|||
{ |
|||
var processor = await GetJobProcessorAsync(message.OwnerId); |
|||
|
|||
await processor.RunAsync(message.Request, ct); |
|||
} |
|||
|
|||
public async Task HandleAsync(JobCancel message, |
|||
CancellationToken ct) |
|||
{ |
|||
var processor = await GetJobProcessorAsync(message.OwnerId); |
|||
|
|||
await processor.CancelAsync(message.TaskName); |
|||
} |
|||
|
|||
public async Task HandleAsync(JobDelete message, |
|||
CancellationToken ct) |
|||
{ |
|||
var processor = await GetJobProcessorAsync(message.OwnerId); |
|||
|
|||
await processor.DeleteAsync(message.JobId); |
|||
} |
|||
|
|||
public async Task HandleAsync(JobClear message, |
|||
CancellationToken ct) |
|||
{ |
|||
var processor = await GetJobProcessorAsync(message.OwnerId); |
|||
|
|||
await processor.ClearAsync(); |
|||
} |
|||
|
|||
private Task<JobProcessor> GetJobProcessorAsync(DomainId appId) |
|||
{ |
|||
lock (processors) |
|||
{ |
|||
return processors.GetOrAdd(appId, async key => |
|||
{ |
|||
var processor = processorFactory(key); |
|||
|
|||
await processor.LoadAsync(default); |
|||
|
|||
return processor; |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,38 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Translations; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
public sealed class JobsState |
|||
{ |
|||
public List<Job> Jobs { get; set; } = []; |
|||
|
|||
public void EnsureCanStart(IJobRunner runner) |
|||
{ |
|||
if (Jobs.Exists(x => x.Status == JobStatus.Started)) |
|||
{ |
|||
throw new DomainException(T.Get("jobs.alreadyRunning")); |
|||
} |
|||
|
|||
var max = runner.MaxJobs; |
|||
|
|||
var jobs = Jobs.Where(x => x.TaskName == runner.Name && x.File == null).Skip(max - 1).ToList(); |
|||
|
|||
foreach (var job in jobs) |
|||
{ |
|||
Jobs.Remove(job); |
|||
} |
|||
|
|||
if (Jobs.Count(x => x.TaskName == runner.Name) >= max) |
|||
{ |
|||
throw new DomainException(T.Get("jobs.maxReached", new { max })); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,206 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.Extensions.Logging; |
|||
using Squidex.Domain.Apps.Core.HandleRules; |
|||
using Squidex.Domain.Apps.Core.Rules; |
|||
using Squidex.Domain.Apps.Entities.Jobs; |
|||
using Squidex.Domain.Apps.Entities.Rules.Repositories; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.EventSourcing; |
|||
using Squidex.Infrastructure.Translations; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Rules.Runner; |
|||
|
|||
public sealed class RuleRunnerJob : IJobRunner |
|||
{ |
|||
public const string TaskName = "run-rule"; |
|||
public const string ArgRuleId = "ruleId"; |
|||
public const string ArgSnapshot = "snapshots"; |
|||
|
|||
private const int MaxErrors = 10; |
|||
private readonly IAppProvider appProvider; |
|||
private readonly IEventFormatter eventFormatter; |
|||
private readonly IEventStore eventStore; |
|||
private readonly IRuleEventRepository ruleEventRepository; |
|||
private readonly IRuleService ruleService; |
|||
private readonly IRuleUsageTracker ruleUsageTracker; |
|||
private readonly ILogger<RuleRunnerJob> log; |
|||
|
|||
public string Name => TaskName; |
|||
|
|||
public RuleRunnerJob( |
|||
IAppProvider appProvider, |
|||
IEventFormatter eventFormatter, |
|||
IEventStore eventStore, |
|||
IRuleEventRepository ruleEventRepository, |
|||
IRuleService ruleService, |
|||
IRuleUsageTracker ruleUsageTracker, |
|||
ILogger<RuleRunnerJob> log) |
|||
{ |
|||
this.appProvider = appProvider; |
|||
this.eventStore = eventStore; |
|||
this.eventFormatter = eventFormatter; |
|||
this.ruleEventRepository = ruleEventRepository; |
|||
this.ruleService = ruleService; |
|||
this.ruleUsageTracker = ruleUsageTracker; |
|||
this.log = log; |
|||
} |
|||
|
|||
public static DomainId? GetRunningRuleId(Job job) |
|||
{ |
|||
if (job.TaskName != TaskName || job.Status != JobStatus.Started) |
|||
{ |
|||
return null; |
|||
} |
|||
|
|||
if (!job.Arguments.TryGetValue(ArgRuleId, out var ruleId)) |
|||
{ |
|||
return null; |
|||
} |
|||
|
|||
return DomainId.Create(ruleId); |
|||
} |
|||
|
|||
public static JobRequest BuildRequest(RefToken actor, DomainId ruleId, bool snapshot) |
|||
{ |
|||
return JobRequest.Create( |
|||
actor, |
|||
TaskName, |
|||
new Dictionary<string, string> |
|||
{ |
|||
[ArgRuleId] = ruleId.ToString(), |
|||
[ArgSnapshot] = snapshot.ToString() |
|||
}); |
|||
} |
|||
|
|||
public async Task RunAsync(JobRunContext context, |
|||
CancellationToken ct) |
|||
{ |
|||
if (!context.Job.Arguments.TryGetValue(ArgRuleId, out var ruleId)) |
|||
{ |
|||
throw new DomainException("Argument missing."); |
|||
} |
|||
|
|||
var rule = await appProvider.GetRuleAsync(context.OwnerId, DomainId.Create(ruleId), ct) |
|||
?? throw new DomainObjectNotFoundException(ruleId); |
|||
|
|||
var fromSnapshot = string.Equals(context.Job.Arguments.GetValueOrDefault(ArgSnapshot), "true", StringComparison.OrdinalIgnoreCase); |
|||
|
|||
// Use a readable name to describe the job.
|
|||
SetDescription(context, rule, fromSnapshot); |
|||
|
|||
// Also run disabled rules, because we want to enable rules to be only used with manual trigger.
|
|||
var ruleContext = new RuleContext |
|||
{ |
|||
AppId = rule.AppId, |
|||
IncludeStale = true, |
|||
IncludeSkipped = true, |
|||
Rule = rule, |
|||
}; |
|||
|
|||
if (fromSnapshot && ruleService.CanCreateSnapshotEvents(rule)) |
|||
{ |
|||
await EnqueueFromSnapshotsAsync(ruleContext, ct); |
|||
} |
|||
else |
|||
{ |
|||
await EnqueueFromEventsAsync(context, ruleContext, ct); |
|||
} |
|||
} |
|||
|
|||
private static void SetDescription(JobRunContext run, Rule rule, bool fromSnapshot) |
|||
{ |
|||
if (!string.IsNullOrWhiteSpace(rule.Name)) |
|||
{ |
|||
var key = fromSnapshot ? |
|||
"job.ruleRunNamedSnapshot" : |
|||
"job.ruleRunName"; |
|||
|
|||
run.Job.Description = T.Get(key, new { name = rule.Name }); |
|||
} |
|||
else |
|||
{ |
|||
var key = fromSnapshot ? |
|||
"job.ruleRunSnapshot" : |
|||
"job.ruleRun"; |
|||
|
|||
run.Job.Description = T.Get(key); |
|||
} |
|||
} |
|||
|
|||
private async Task EnqueueFromSnapshotsAsync(RuleContext context, |
|||
CancellationToken ct) |
|||
{ |
|||
// We collect errors and allow a few erors before we throw an exception.
|
|||
var errors = 0; |
|||
|
|||
// Write in batches of 100 items for better performance. Using completes the last write.
|
|||
await using var batch = new RuleQueueWriter(ruleEventRepository, ruleUsageTracker, null); |
|||
|
|||
await foreach (var result in ruleService.CreateSnapshotJobsAsync(context, ct)) |
|||
{ |
|||
await batch.WriteAsync(result); |
|||
|
|||
if (result.EnrichmentError != null) |
|||
{ |
|||
errors++; |
|||
|
|||
// We accept a few errors and stop the process if there are too many errors.
|
|||
if (errors >= MaxErrors) |
|||
{ |
|||
throw result.EnrichmentError; |
|||
} |
|||
|
|||
log.LogWarning(result.EnrichmentError, "Failed to run rule with ID {ruleId}, continue with next job.", result.Rule?.Id); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task EnqueueFromEventsAsync(JobRunContext run, RuleContext context, |
|||
CancellationToken ct) |
|||
{ |
|||
// We collect errors and allow a few erors before we throw an exception.
|
|||
var errors = 0; |
|||
|
|||
// Write in batches of 100 items for better performance. Using completes the last write.
|
|||
await using var batch = new RuleQueueWriter(ruleEventRepository, ruleUsageTracker, null); |
|||
|
|||
// Use a prefix query so that the storage can use an index for the query.
|
|||
var streamFilter = StreamFilter.Prefix($"([a-zA-Z0-9]+)\\-{run.OwnerId}"); |
|||
|
|||
await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, ct: ct)) |
|||
{ |
|||
var @event = eventFormatter.ParseIfKnown(storedEvent); |
|||
|
|||
if (@event == null) |
|||
{ |
|||
continue; |
|||
} |
|||
|
|||
await foreach (var result in ruleService.CreateJobsAsync(@event, context.ToRulesContext(), ct)) |
|||
{ |
|||
await batch.WriteAsync(result); |
|||
|
|||
if (result.EnrichmentError != null) |
|||
{ |
|||
errors++; |
|||
|
|||
// We accept a few errors and stop the process if there are too many errors.
|
|||
if (errors >= MaxErrors) |
|||
{ |
|||
throw result.EnrichmentError; |
|||
} |
|||
|
|||
log.LogWarning(result.EnrichmentError, "Failed to run rule with ID {ruleId}, continue with next job.", result.Rule?.Id); |
|||
} |
|||
} |
|||
} |
|||
|
|||
await batch.FlushAsync(); |
|||
} |
|||
} |
|||
@ -1,298 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.Extensions.Logging; |
|||
using Squidex.Caching; |
|||
using Squidex.Domain.Apps.Core.HandleRules; |
|||
using Squidex.Domain.Apps.Entities.Rules.Repositories; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.EventSourcing; |
|||
using Squidex.Infrastructure.States; |
|||
using Squidex.Infrastructure.Tasks; |
|||
using Squidex.Infrastructure.Translations; |
|||
using TaskHelper = Squidex.Infrastructure.Tasks.TaskExtensions; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Rules.Runner; |
|||
|
|||
public sealed class RuleRunnerProcessor |
|||
{ |
|||
private const int MaxErrors = 10; |
|||
private readonly IAppProvider appProvider; |
|||
private readonly IEventFormatter eventFormatter; |
|||
private readonly IEventStore eventStore; |
|||
private readonly ILocalCache localCache; |
|||
private readonly IRuleEventRepository ruleEventRepository; |
|||
private readonly IRuleService ruleService; |
|||
private readonly IRuleUsageTracker ruleUsageTracker; |
|||
private readonly ILogger<RuleRunnerProcessor> log; |
|||
private readonly SimpleState<RuleRunnerState> state; |
|||
private readonly ReentrantScheduler scheduler = new ReentrantScheduler(1); |
|||
private readonly DomainId appId; |
|||
private Run? currentRun; |
|||
|
|||
// Use a run to store all state that is necessary for a single run.
|
|||
private sealed class Run : IDisposable |
|||
{ |
|||
private readonly CancellationTokenSource cancellationSource = new CancellationTokenSource(); |
|||
private readonly CancellationTokenSource cancellationLinked; |
|||
|
|||
public RuleRunnerState Job { get; init; } |
|||
|
|||
public RuleContext Context { get; set; } |
|||
|
|||
public CancellationToken CancellationToken => cancellationLinked.Token; |
|||
|
|||
public Run(CancellationToken ct) |
|||
{ |
|||
cancellationLinked = CancellationTokenSource.CreateLinkedTokenSource(ct, cancellationSource.Token); |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
cancellationSource.Dispose(); |
|||
cancellationLinked.Dispose(); |
|||
} |
|||
|
|||
public void Cancel() |
|||
{ |
|||
try |
|||
{ |
|||
cancellationSource.Cancel(); |
|||
} |
|||
catch (ObjectDisposedException) |
|||
{ |
|||
// Cancellation token might have been disposed, if the run is completed.
|
|||
} |
|||
} |
|||
} |
|||
|
|||
public RuleRunnerProcessor( |
|||
DomainId appId, |
|||
IAppProvider appProvider, |
|||
IEventFormatter eventFormatter, |
|||
IEventStore eventStore, |
|||
ILocalCache localCache, |
|||
IPersistenceFactory<RuleRunnerState> persistenceFactory, |
|||
IRuleEventRepository ruleEventRepository, |
|||
IRuleService ruleService, |
|||
IRuleUsageTracker ruleUsageTracker, |
|||
ILogger<RuleRunnerProcessor> log) |
|||
{ |
|||
this.appId = appId; |
|||
this.appProvider = appProvider; |
|||
this.localCache = localCache; |
|||
this.eventStore = eventStore; |
|||
this.eventFormatter = eventFormatter; |
|||
this.ruleEventRepository = ruleEventRepository; |
|||
this.ruleService = ruleService; |
|||
this.ruleUsageTracker = ruleUsageTracker; |
|||
this.log = log; |
|||
|
|||
state = new SimpleState<RuleRunnerState>(persistenceFactory, GetType(), appId); |
|||
} |
|||
|
|||
public async Task LoadAsync( |
|||
CancellationToken ct = default) |
|||
{ |
|||
await state.LoadAsync(ct); |
|||
|
|||
if (!state.Value.RunFromSnapshots && state.Value.RuleId != default) |
|||
{ |
|||
TaskHelper.Forget(RunAsync(state.Value.RuleId, false, default)); |
|||
} |
|||
else |
|||
{ |
|||
await state.ClearAsync(ct); |
|||
} |
|||
} |
|||
|
|||
public Task CancelAsync() |
|||
{ |
|||
// Ensure that only one thread is accessing the current state at a time.
|
|||
return scheduler.Schedule(() => |
|||
{ |
|||
currentRun?.Cancel(); |
|||
}); |
|||
} |
|||
|
|||
public Task RunAsync(DomainId ruleId, bool fromSnapshots, |
|||
CancellationToken ct) |
|||
{ |
|||
return scheduler.ScheduleAsync(async ct => |
|||
{ |
|||
// There is no continuation token for snapshots, therefore we cannot continue with the run.
|
|||
if (currentRun?.Job.RunFromSnapshots == true) |
|||
{ |
|||
throw new DomainException(T.Get("rules.ruleAlreadyRunning")); |
|||
} |
|||
|
|||
var previousJob = state.Value; |
|||
|
|||
// If we have not removed the state, we have not completed the previous run and can therefore just continue.
|
|||
var position = |
|||
previousJob.RuleId == ruleId && !previousJob.RunFromSnapshots ? |
|||
previousJob.Position : |
|||
null; |
|||
|
|||
// Set the current run first to indicate that we are running a rule at the moment.
|
|||
var run = currentRun = new Run(ct) |
|||
{ |
|||
Job = new RuleRunnerState |
|||
{ |
|||
RuleId = ruleId, |
|||
RunId = DomainId.NewGuid(), |
|||
RunFromSnapshots = fromSnapshots, |
|||
Position = position |
|||
} |
|||
}; |
|||
|
|||
state.Value = run.Job; |
|||
try |
|||
{ |
|||
await state.WriteAsync(run.CancellationToken); |
|||
|
|||
await ProcessAsync(run, run.CancellationToken); |
|||
} |
|||
finally |
|||
{ |
|||
// Unset the run to indicate that we are done.
|
|||
currentRun.Dispose(); |
|||
currentRun = null; |
|||
} |
|||
}, ct); |
|||
} |
|||
|
|||
private async Task ProcessAsync(Run run, |
|||
CancellationToken ct) |
|||
{ |
|||
try |
|||
{ |
|||
var rule = await appProvider.GetRuleAsync(appId, run.Job.RuleId, ct); |
|||
|
|||
// The rule might have been deleted in the meantime.
|
|||
if (rule == null) |
|||
{ |
|||
throw new DomainObjectNotFoundException(run.Job.RuleId.ToString()!); |
|||
} |
|||
|
|||
using (localCache.StartContext()) |
|||
{ |
|||
// Also run disabled rules, because we want to enable rules to be only used with manual trigger.
|
|||
run.Context = new RuleContext |
|||
{ |
|||
AppId = rule.AppId, |
|||
IncludeStale = true, |
|||
IncludeSkipped = true, |
|||
Rule = rule, |
|||
}; |
|||
|
|||
if (run.Job.RunFromSnapshots && ruleService.CanCreateSnapshotEvents(rule)) |
|||
{ |
|||
await EnqueueFromSnapshotsAsync(run, ct); |
|||
} |
|||
else |
|||
{ |
|||
await EnqueueFromEventsAsync(run, ct); |
|||
} |
|||
} |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
return; |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, "Failed to run rule with ID {ruleId}.", run.Job.RuleId); |
|||
} |
|||
finally |
|||
{ |
|||
// Remove the state to indicate that the run has been completed.
|
|||
await state.ClearAsync(default); |
|||
} |
|||
} |
|||
|
|||
private async Task EnqueueFromSnapshotsAsync(Run run, |
|||
CancellationToken ct) |
|||
{ |
|||
// We collect errors and allow a few erors before we throw an exception.
|
|||
var errors = 0; |
|||
|
|||
// Write in batches of 100 items for better performance. Using completes the last write.
|
|||
await using var batch = new RuleQueueWriter(ruleEventRepository, ruleUsageTracker, null); |
|||
|
|||
await foreach (var result in ruleService.CreateSnapshotJobsAsync(run.Context, ct)) |
|||
{ |
|||
await batch.WriteAsync(result); |
|||
|
|||
if (result.EnrichmentError != null) |
|||
{ |
|||
errors++; |
|||
|
|||
// We accept a few errors and stop the process if there are too many errors.
|
|||
if (errors >= MaxErrors) |
|||
{ |
|||
throw result.EnrichmentError; |
|||
} |
|||
|
|||
log.LogWarning(result.EnrichmentError, "Failed to run rule with ID {ruleId}, continue with next job.", result.Rule?.Id); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task EnqueueFromEventsAsync(Run run, |
|||
CancellationToken ct) |
|||
{ |
|||
// We collect errors and allow a few erors before we throw an exception.
|
|||
var errors = 0; |
|||
|
|||
// Write in batches of 100 items for better performance. Using completes the last write.
|
|||
await using var batch = new RuleQueueWriter(ruleEventRepository, ruleUsageTracker, null); |
|||
|
|||
// Use a prefix query so that the storage can use an index for the query.
|
|||
var streamFilter = StreamFilter.Prefix($"([a-zA-Z0-9]+)\\-{appId}"); |
|||
|
|||
await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, run.Job.Position, ct: ct)) |
|||
{ |
|||
var @event = eventFormatter.ParseIfKnown(storedEvent); |
|||
|
|||
if (@event == null) |
|||
{ |
|||
continue; |
|||
} |
|||
|
|||
run.Job.Position = storedEvent.EventPosition; |
|||
|
|||
await foreach (var result in ruleService.CreateJobsAsync(@event, run.Context.ToRulesContext(), ct)) |
|||
{ |
|||
if (await batch.WriteAsync(result)) |
|||
{ |
|||
// Update the process when something has been written.
|
|||
await state.WriteAsync(ct); |
|||
} |
|||
|
|||
if (result.EnrichmentError != null) |
|||
{ |
|||
errors++; |
|||
|
|||
// We accept a few errors and stop the process if there are too many errors.
|
|||
if (errors >= MaxErrors) |
|||
{ |
|||
throw result.EnrichmentError; |
|||
} |
|||
|
|||
log.LogWarning(result.EnrichmentError, "Failed to run rule with ID {ruleId}, continue with next job.", result.Rule?.Id); |
|||
} |
|||
} |
|||
} |
|||
|
|||
if (await batch.FlushAsync()) |
|||
{ |
|||
// Update the process when something has been written.
|
|||
await state.WriteAsync(ct); |
|||
} |
|||
} |
|||
} |
|||
@ -1,23 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.States; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Rules.Runner; |
|||
|
|||
[CollectionName("Rules_Runner")] |
|||
public sealed class RuleRunnerState |
|||
{ |
|||
public DomainId RuleId { get; set; } |
|||
|
|||
public DomainId RunId { get; set; } |
|||
|
|||
public string? Position { get; set; } |
|||
|
|||
public bool RunFromSnapshots { get; set; } |
|||
} |
|||
@ -1,78 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Squidex.Hosting; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.States; |
|||
using Squidex.Messaging; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Rules.Runner; |
|||
|
|||
public sealed class RuleRunnerWorker : |
|||
IBackgroundProcess, |
|||
IMessageHandler<RuleRunnerRun>, |
|||
IMessageHandler<RuleRunnerCancel> |
|||
{ |
|||
private readonly Dictionary<DomainId, Task<RuleRunnerProcessor>> processors = []; |
|||
private readonly Func<DomainId, RuleRunnerProcessor> processorFactory; |
|||
private readonly ISnapshotStore<RuleRunnerState> snapshotStore; |
|||
|
|||
public RuleRunnerWorker(IServiceProvider serviceProvider, ISnapshotStore<RuleRunnerState> snapshotStore) |
|||
{ |
|||
var objectFactory = ActivatorUtilities.CreateFactory(typeof(RuleRunnerProcessor), [typeof(DomainId)]); |
|||
|
|||
processorFactory = key => |
|||
{ |
|||
return (RuleRunnerProcessor)objectFactory(serviceProvider, new object[] { key }); |
|||
}; |
|||
|
|||
this.snapshotStore = snapshotStore; |
|||
} |
|||
|
|||
public async Task StartAsync( |
|||
CancellationToken ct) |
|||
{ |
|||
await foreach (var snapshot in snapshotStore.ReadAllAsync(ct)) |
|||
{ |
|||
await GetProcessorAsync(snapshot.Key, ct); |
|||
} |
|||
} |
|||
|
|||
public async Task HandleAsync(RuleRunnerRun message, |
|||
CancellationToken ct) |
|||
{ |
|||
var processor = await GetProcessorAsync(message.AppId, ct); |
|||
|
|||
await processor.RunAsync(message.RuleId, message.FromSnapshots, ct); |
|||
} |
|||
|
|||
public async Task HandleAsync(RuleRunnerCancel message, |
|||
CancellationToken ct) |
|||
{ |
|||
var processor = await GetProcessorAsync(message.AppId, ct); |
|||
|
|||
await processor.CancelAsync(); |
|||
} |
|||
|
|||
private Task<RuleRunnerProcessor> GetProcessorAsync(DomainId appId, |
|||
CancellationToken ct) |
|||
{ |
|||
// Use a normal dictionary to avoid double creations.
|
|||
lock (processors) |
|||
{ |
|||
return processors.GetOrAdd(appId, async key => |
|||
{ |
|||
var processor = processorFactory(key); |
|||
|
|||
await processor.LoadAsync(ct); |
|||
|
|||
return processor; |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,67 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.AspNetCore.Authorization; |
|||
using Microsoft.AspNetCore.Mvc; |
|||
using Squidex.Domain.Apps.Entities.Jobs; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Commands; |
|||
using Squidex.Web; |
|||
|
|||
namespace Squidex.Areas.Api.Controllers.Jobs; |
|||
|
|||
/// <summary>
|
|||
/// Update and query jobs for app.
|
|||
/// </summary>
|
|||
[ApiExplorerSettings(GroupName = nameof(Jobs))] |
|||
public class JobsContentController : ApiController |
|||
{ |
|||
private readonly IJobService jobService; |
|||
|
|||
public JobsContentController(ICommandBus commandBus, |
|||
IJobService jobService) |
|||
: base(commandBus) |
|||
{ |
|||
this.jobService = jobService; |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Get the job content.
|
|||
/// </summary>
|
|||
/// <param name="id">The ID of the job.</param>
|
|||
/// <param name="appId">The ID of the app.</param>
|
|||
/// <response code="200">Job found and content returned.</response>
|
|||
/// <response code="404">Job or app not found.</response>
|
|||
[HttpGet] |
|||
[Route("apps/jobs/{id}")] |
|||
[ResponseCache(Duration = 3600 * 24 * 30)] |
|||
[ProducesResponseType(typeof(FileResult), StatusCodes.Status200OK)] |
|||
[ApiCosts(0)] |
|||
[AllowAnonymous] |
|||
public async Task<IActionResult> GetJobContent(DomainId id, [FromQuery] DomainId appId = default) |
|||
{ |
|||
var jobs = await jobService.GetJobsAsync(appId, HttpContext.RequestAborted); |
|||
var job = jobs.Find(x => x.Id == id); |
|||
|
|||
if (job is not { Status: JobStatus.Completed } || job.File == null) |
|||
{ |
|||
return NotFound(); |
|||
} |
|||
|
|||
var callback = new FileCallback((body, range, ct) => |
|||
{ |
|||
return jobService.DownloadAsync(job, body, ct); |
|||
}); |
|||
|
|||
return new FileCallbackResult(job.File.MimeType, callback) |
|||
{ |
|||
FileDownloadName = job.File.Name, |
|||
FileSize = null, |
|||
ErrorAs404 = true |
|||
}; |
|||
} |
|||
} |
|||
@ -0,0 +1,70 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.AspNetCore.Mvc; |
|||
using Squidex.Areas.Api.Controllers.Jobs.Models; |
|||
using Squidex.Domain.Apps.Entities.Jobs; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Commands; |
|||
using Squidex.Shared; |
|||
using Squidex.Web; |
|||
|
|||
namespace Squidex.Areas.Api.Controllers.Jobs; |
|||
|
|||
/// <summary>
|
|||
/// Update and query jobs for apps.
|
|||
/// </summary>
|
|||
[ApiExplorerSettings(GroupName = nameof(Jobs))] |
|||
public class JobsController : ApiController |
|||
{ |
|||
private readonly IJobService jobService; |
|||
|
|||
public JobsController(ICommandBus commandBus, IJobService jobService) |
|||
: base(commandBus) |
|||
{ |
|||
this.jobService = jobService; |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Get all jobs.
|
|||
/// </summary>
|
|||
/// <param name="app">The name of the app.</param>
|
|||
/// <response code="200">Jobs returned.</response>
|
|||
/// <response code="404">App not found.</response>
|
|||
[HttpGet] |
|||
[Route("apps/{app}/jobs/")] |
|||
[ProducesResponseType(typeof(JobsDto), StatusCodes.Status200OK)] |
|||
[ApiPermissionOrAnonymous(PermissionIds.AppJobsRead)] |
|||
[ApiCosts(0)] |
|||
public async Task<IActionResult> GetJobs(string app) |
|||
{ |
|||
var jobs = await jobService.GetJobsAsync(App.Id, HttpContext.RequestAborted); |
|||
|
|||
var result = JobsDto.FromDomain(jobs, Resources); |
|||
|
|||
return Ok(result); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Delete a job.
|
|||
/// </summary>
|
|||
/// <param name="app">The name of the app.</param>
|
|||
/// <param name="id">The ID of the jobs to delete.</param>
|
|||
/// <response code="204">Job deleted.</response>
|
|||
/// <response code="404">Job or app not found.</response>
|
|||
[HttpDelete] |
|||
[Route("apps/{app}/jobs/{id}")] |
|||
[ProducesResponseType(StatusCodes.Status204NoContent)] |
|||
[ApiPermissionOrAnonymous(PermissionIds.AppJobsDelete)] |
|||
[ApiCosts(0)] |
|||
public async Task<IActionResult> DeleteJob(string app, DomainId id) |
|||
{ |
|||
await jobService.DeleteJobAsync(App.Id, id, default); |
|||
|
|||
return NoContent(); |
|||
} |
|||
} |
|||
@ -0,0 +1,98 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Entities.Jobs; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Collections; |
|||
using Squidex.Infrastructure.Reflection; |
|||
using Squidex.Web; |
|||
|
|||
namespace Squidex.Areas.Api.Controllers.Jobs.Models; |
|||
|
|||
public sealed class JobDto : Resource |
|||
{ |
|||
/// <summary>
|
|||
/// The ID of the job.
|
|||
/// </summary>
|
|||
public DomainId Id { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The time when the job has been started.
|
|||
/// </summary>
|
|||
public Instant Started { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The time when the job has been stopped.
|
|||
/// </summary>
|
|||
public Instant? Stopped { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The status of the operation.
|
|||
/// </summary>
|
|||
public JobStatus Status { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The name of the task.
|
|||
/// </summary>
|
|||
public string TaskName { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The description of the job.
|
|||
/// </summary>
|
|||
public string Description { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The arguments for the job.
|
|||
/// </summary>
|
|||
public ReadonlyDictionary<string, string> TaskArguments { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The list of log items.
|
|||
/// </summary>
|
|||
public List<JobLogMessageDto> Log { get; set; } = []; |
|||
|
|||
/// <summary>
|
|||
/// Indicates whether the job can be downloaded.
|
|||
/// </summary>
|
|||
public bool CanDownload { get; set; } |
|||
|
|||
public static JobDto FromDomain(Job job, Resources resources) |
|||
{ |
|||
var result = SimpleMapper.Map(job, new JobDto()); |
|||
|
|||
if (job.Log?.Count > 0) |
|||
{ |
|||
result.Log = job.Log.Select(JobLogMessageDto.FromDomain).ToList(); |
|||
} |
|||
|
|||
result.TaskArguments = job.Arguments; |
|||
|
|||
return result.CreateLinks(job, resources); |
|||
} |
|||
|
|||
private JobDto CreateLinks(Job job, Resources resources) |
|||
{ |
|||
if (resources.CanDeleteJob) |
|||
{ |
|||
var values = new { app = resources.App, id = Id }; |
|||
|
|||
AddDeleteLink("delete", |
|||
resources.Url<JobsController>(x => nameof(x.DeleteJob), values)); |
|||
} |
|||
|
|||
if (resources.CanDownloadJob && Status == JobStatus.Completed && job.File != null) |
|||
{ |
|||
var values = new { appId = resources.AppId, id = Id }; |
|||
|
|||
AddGetLink("download", |
|||
resources.Url<JobsContentController>(x => nameof(x.GetJobContent), values)); |
|||
} |
|||
|
|||
return this; |
|||
} |
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
namespace Squidex.Areas.Api.Controllers.Jobs.Models; |
|||
|
|||
public class JobLogMessageDto |
|||
{ |
|||
/// <summary>
|
|||
/// The timestamp.
|
|||
/// </summary>
|
|||
public Instant Timestamp { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The log message.
|
|||
/// </summary>
|
|||
public string Message { get; set; } |
|||
|
|||
public static JobLogMessageDto FromDomain(JobLogMessage source) |
|||
{ |
|||
return new JobLogMessageDto { Timestamp = source.Timestamp, Message = source.Message }; |
|||
} |
|||
} |
|||
@ -0,0 +1,45 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Areas.Api.Controllers.Backups; |
|||
using Squidex.Domain.Apps.Entities.Jobs; |
|||
using Squidex.Web; |
|||
|
|||
namespace Squidex.Areas.Api.Controllers.Jobs.Models; |
|||
|
|||
public sealed class JobsDto : Resource |
|||
{ |
|||
/// <summary>
|
|||
/// The jobs.
|
|||
/// </summary>
|
|||
public JobDto[] Items { get; set; } |
|||
|
|||
public static JobsDto FromDomain(IEnumerable<Job> jobs, Resources resources) |
|||
{ |
|||
var result = new JobsDto |
|||
{ |
|||
Items = jobs.Select(x => JobDto.FromDomain(x, resources)).ToArray() |
|||
}; |
|||
|
|||
return result.CreateLinks(resources); |
|||
} |
|||
|
|||
private JobsDto CreateLinks(Resources resources) |
|||
{ |
|||
var values = new { app = resources.App }; |
|||
|
|||
AddSelfLink(resources.Url<JobsController>(x => nameof(x.GetJobs), values)); |
|||
|
|||
if (resources.CanCreateBackup) |
|||
{ |
|||
AddPostLink("create/backups", |
|||
resources.Url<BackupsController>(x => nameof(x.PostBackup), values)); |
|||
} |
|||
|
|||
return this; |
|||
} |
|||
} |
|||
@ -1,161 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Entities.Backup.State; |
|||
using Squidex.Domain.Apps.Entities.TestHelpers; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.TestHelpers; |
|||
using Squidex.Messaging; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Backup; |
|||
|
|||
public class BackupServiceTests : GivenContext |
|||
{ |
|||
private readonly TestState<BackupState> stateBackup; |
|||
private readonly TestState<BackupRestoreState> stateRestore; |
|||
private readonly IMessageBus messaging = A.Fake<IMessageBus>(); |
|||
private readonly DomainId backupId = DomainId.NewGuid(); |
|||
private readonly BackupService sut; |
|||
|
|||
public BackupServiceTests() |
|||
{ |
|||
stateRestore = new TestState<BackupRestoreState>("Default"); |
|||
stateBackup = new TestState<BackupState>(AppId.Id); |
|||
|
|||
sut = new BackupService( |
|||
stateRestore.PersistenceFactory, |
|||
stateBackup.PersistenceFactory, messaging); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_send_message_to_restore_backup() |
|||
{ |
|||
var restoreUrl = new Uri("http://squidex.io"); |
|||
var restoreAppName = "New App"; |
|||
|
|||
await sut.StartRestoreAsync(User, restoreUrl, restoreAppName, CancellationToken); |
|||
|
|||
A.CallTo(() => messaging.PublishAsync(new BackupRestore(User, restoreUrl, restoreAppName), null, CancellationToken)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_send_message_to_start_backup() |
|||
{ |
|||
await sut.StartBackupAsync(AppId.Id, User, CancellationToken); |
|||
|
|||
A.CallTo(() => messaging.PublishAsync(new BackupStart(AppId.Id, User), null, CancellationToken)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_send_message_to_delete_backup() |
|||
{ |
|||
await sut.DeleteBackupAsync(AppId.Id, backupId, CancellationToken); |
|||
|
|||
A.CallTo(() => messaging.PublishAsync(new BackupDelete(AppId.Id, backupId), null, CancellationToken)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_send_message_to_clear_backups() |
|||
{ |
|||
await ((IDeleter)sut).DeleteAppAsync(App, CancellationToken); |
|||
|
|||
A.CallTo(() => messaging.PublishAsync(new BackupClear(AppId.Id), null, CancellationToken)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_throw_exception_when_restore_already_running() |
|||
{ |
|||
stateRestore.Snapshot = new BackupRestoreState |
|||
{ |
|||
Job = new RestoreJob |
|||
{ |
|||
Status = JobStatus.Started |
|||
} |
|||
}; |
|||
|
|||
var restoreUrl = new Uri("http://squidex.io"); |
|||
|
|||
await Assert.ThrowsAnyAsync<DomainException>(() => sut.StartRestoreAsync(User, restoreUrl, null, CancellationToken)); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_throw_exception_when_backup_has_too_many_jobs() |
|||
{ |
|||
for (var i = 0; i < 10; i++) |
|||
{ |
|||
stateBackup.Snapshot.Jobs.Add(new BackupJob()); |
|||
} |
|||
|
|||
await Assert.ThrowsAnyAsync<DomainException>(() => sut.StartBackupAsync(AppId.Id, User, CancellationToken)); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_throw_exception_when_backup_has_one_running_job() |
|||
{ |
|||
for (var i = 0; i < 2; i++) |
|||
{ |
|||
stateBackup.Snapshot.Jobs.Add(new BackupJob { Status = JobStatus.Started }); |
|||
} |
|||
|
|||
await Assert.ThrowsAnyAsync<DomainException>(() => sut.StartBackupAsync(AppId.Id, User, CancellationToken)); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_get_restore_state_from_store() |
|||
{ |
|||
stateRestore.Snapshot = new BackupRestoreState |
|||
{ |
|||
Job = new RestoreJob |
|||
{ |
|||
Stopped = SystemClock.Instance.GetCurrentInstant() |
|||
} |
|||
}; |
|||
|
|||
var actual = await sut.GetRestoreAsync(CancellationToken); |
|||
|
|||
actual.Should().BeEquivalentTo(stateRestore.Snapshot.Job); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_get_backups_state_from_store() |
|||
{ |
|||
var job = new BackupJob |
|||
{ |
|||
Id = backupId, |
|||
Started = SystemClock.Instance.GetCurrentInstant(), |
|||
Stopped = SystemClock.Instance.GetCurrentInstant() |
|||
}; |
|||
|
|||
stateBackup.Snapshot.Jobs.Add(job); |
|||
|
|||
var actual = await sut.GetBackupsAsync(AppId.Id, CancellationToken); |
|||
|
|||
actual.Should().BeEquivalentTo(stateBackup.Snapshot.Jobs); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_get_backup_state_from_store() |
|||
{ |
|||
var job = new BackupJob |
|||
{ |
|||
Id = backupId, |
|||
Started = SystemClock.Instance.GetCurrentInstant(), |
|||
Stopped = SystemClock.Instance.GetCurrentInstant() |
|||
}; |
|||
|
|||
stateBackup.Snapshot.Jobs.Add(job); |
|||
|
|||
var actual = await sut.GetBackupAsync(AppId.Id, backupId, CancellationToken); |
|||
|
|||
actual.Should().BeEquivalentTo(job); |
|||
} |
|||
} |
|||
@ -0,0 +1,162 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Entities.TestHelpers; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.TestHelpers; |
|||
using Squidex.Messaging; |
|||
|
|||
namespace Squidex.Domain.Apps.Entities.Jobs; |
|||
|
|||
public class DefaultJobsServiceTests : GivenContext |
|||
{ |
|||
private readonly TestState<JobsState> state; |
|||
private readonly IJobRunner runner1 = A.Fake<IJobRunner>(); |
|||
private readonly IJobRunner runner2 = A.Fake<IJobRunner>(); |
|||
private readonly IMessageBus messaging = A.Fake<IMessageBus>(); |
|||
private readonly Stream stream = new MemoryStream(); |
|||
private readonly DomainId jobId = DomainId.NewGuid(); |
|||
private readonly DefaultJobService sut; |
|||
|
|||
public DefaultJobsServiceTests() |
|||
{ |
|||
state = new TestState<JobsState>(AppId.Id); |
|||
|
|||
A.CallTo(() => runner1.Name) |
|||
.Returns("job1"); |
|||
|
|||
A.CallTo(() => runner1.MaxJobs) |
|||
.Returns(2); |
|||
|
|||
A.CallTo(() => runner2.Name) |
|||
.Returns("job2"); |
|||
|
|||
sut = new DefaultJobService(messaging, new[] { runner1, runner2 }, state.PersistenceFactory); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_send_message_to_start_job() |
|||
{ |
|||
var request = JobRequest.Create(User, "job1"); |
|||
|
|||
await sut.StartAsync(AppId.Id, request, CancellationToken); |
|||
|
|||
A.CallTo(() => messaging.PublishAsync(new JobStart(AppId.Id, request), null, CancellationToken)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_send_message_to_delete_backup() |
|||
{ |
|||
await sut.DeleteJobAsync(AppId.Id, jobId, CancellationToken); |
|||
|
|||
A.CallTo(() => messaging.PublishAsync(new JobDelete(AppId.Id, jobId), null, CancellationToken)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_send_message_to_clear_backups() |
|||
{ |
|||
await ((IDeleter)sut).DeleteAppAsync(App, CancellationToken); |
|||
|
|||
A.CallTo(() => messaging.PublishAsync(new JobClear(AppId.Id), null, CancellationToken)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_throw_exception_when_job_is_invalid() |
|||
{ |
|||
var request = JobRequest.Create(User, "unknown"); |
|||
|
|||
await Assert.ThrowsAnyAsync<DomainException>(() => sut.StartAsync(App.Id, request, CancellationToken)); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_throw_exception_when_job_is_already_running() |
|||
{ |
|||
state.Snapshot.Jobs.Add(new Job { Status = JobStatus.Started }); |
|||
|
|||
var request = JobRequest.Create(User, "job1"); |
|||
|
|||
await Assert.ThrowsAnyAsync<DomainException>(() => sut.StartAsync(App.Id, request, CancellationToken)); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_throw_exception_when_backup_has_too_many_jobs() |
|||
{ |
|||
state.Snapshot.Jobs.Add(new Job { TaskName = "job1", File = new JobFile("file", "type") }); |
|||
state.Snapshot.Jobs.Add(new Job { TaskName = "job1", File = new JobFile("file", "type") }); |
|||
|
|||
var request = JobRequest.Create(User, "job1"); |
|||
|
|||
await Assert.ThrowsAnyAsync<DomainException>(() => sut.StartAsync(App.Id, request, CancellationToken)); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_not_throw_exception_when_backup_has_too_many_jobs_without_files() |
|||
{ |
|||
state.Snapshot.Jobs.Add(new Job { TaskName = "job1" }); |
|||
state.Snapshot.Jobs.Add(new Job { TaskName = "job1" }); |
|||
|
|||
var request = JobRequest.Create(User, "job1"); |
|||
|
|||
await sut.StartAsync(App.Id, request, CancellationToken); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_get_backups_state_from_store() |
|||
{ |
|||
var job = new Job |
|||
{ |
|||
Id = jobId, |
|||
Started = SystemClock.Instance.GetCurrentInstant(), |
|||
Stopped = SystemClock.Instance.GetCurrentInstant() |
|||
}; |
|||
|
|||
state.Snapshot.Jobs.Add(job); |
|||
|
|||
var actual = await sut.GetJobsAsync(AppId.Id, CancellationToken); |
|||
|
|||
actual.Should().BeEquivalentTo(state.Snapshot.Jobs); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_download_file() |
|||
{ |
|||
var job = new Job { TaskName = "job2", Status = JobStatus.Completed, File = new JobFile("file", "type") }; |
|||
|
|||
await sut.DownloadAsync(job, stream, CancellationToken); |
|||
|
|||
A.CallTo(() => runner2.DownloadAsync(job, stream, CancellationToken)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_throw_exception_if_job_to_download_has_no_file() |
|||
{ |
|||
var job = new Job { TaskName = "job2", Status = JobStatus.Completed, File = null }; |
|||
|
|||
await Assert.ThrowsAsync<InvalidOperationException>(() => sut.DownloadAsync(job, stream, CancellationToken)); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_throw_exception_if_job_is_not_completed() |
|||
{ |
|||
var job = new Job { TaskName = "job2", Status = JobStatus.Started, File = new JobFile("file", "type") }; |
|||
|
|||
await Assert.ThrowsAsync<InvalidOperationException>(() => sut.DownloadAsync(job, stream, CancellationToken)); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_throw_exception_if_job_has_invalid_task_name() |
|||
{ |
|||
var job = new Job { TaskName = "invalid", Status = JobStatus.Completed, File = new JobFile("file", "type") }; |
|||
|
|||
await Assert.ThrowsAsync<InvalidOperationException>(() => sut.DownloadAsync(job, stream, CancellationToken)); |
|||
} |
|||
} |
|||
@ -1,49 +0,0 @@ |
|||
<div class="table-items-row table-items-row-summary"> |
|||
<div class="row"> |
|||
<div class="col-auto" [ngSwitch]="backup.status"> |
|||
<sqx-status-icon size="lg" [status]="backup.status"></sqx-status-icon> |
|||
</div> |
|||
<div class="col-auto"> |
|||
<div> |
|||
{{ 'backups.startedLabel' | sqxTranslate }}: |
|||
</div> |
|||
<div> |
|||
{{ 'backups.backupDuration' | sqxTranslate }}: |
|||
</div> |
|||
</div> |
|||
<div class="col text-nowrap"> |
|||
<div> |
|||
{{backup.started | sqxFromNow}} |
|||
</div> |
|||
<div *ngIf="backup.stopped"> |
|||
{{duration}} |
|||
</div> |
|||
</div> |
|||
<div class="col"> |
|||
<div class="text-nowrap"> |
|||
<span title="i18n:backups.backupCountEventsTooltip"> |
|||
{{ 'backups.backupCountEventsLabel' | sqxTranslate }}: <strong class="backup-progress">{{backup.handledEvents | sqxKNumber}}</strong> |
|||
</span>, |
|||
<span title="i18n:backups.backupCountAssetsTooltip"> |
|||
{{ 'backups.backupCountAssetsLabel' | sqxTranslate }}: <strong class="backup-progress">{{backup.handledAssets | sqxKNumber}}</strong> |
|||
</span> |
|||
</div> |
|||
<div *ngIf="backup.canDownload"> |
|||
{{ 'backups.backupDownload' | sqxTranslate }}: |
|||
|
|||
<a href="{{apiUrl.buildUrl(backup.downloadUrl)}}" sqxExternalLink="noicon"> |
|||
{{ 'backups.backupDownloadLink' | sqxTranslate }} <i class="icon-external-link"></i> |
|||
</a> |
|||
</div> |
|||
</div> |
|||
<div class="col-auto"> |
|||
<button type="button" class="btn btn-text-danger mt-1" [disabled]="!backup.canDelete" |
|||
(sqxConfirmClick)="delete()" |
|||
confirmTitle="i18n:backups.deleteConfirmTitle" |
|||
confirmText="i18n:backups.deleteConfirmText" |
|||
confirmRememberKey="deleteBackup"> |
|||
<i class="icon-bin2"></i> |
|||
</button> |
|||
</div> |
|||
</div> |
|||
</div> |
|||
@ -1,51 +0,0 @@ |
|||
/* |
|||
* Squidex Headless CMS |
|||
* |
|||
* @license |
|||
* Copyright (c) Squidex UG (haftungsbeschränkt). All rights reserved. |
|||
*/ |
|||
|
|||
import { NgIf, NgSwitch } from '@angular/common'; |
|||
import { ChangeDetectionStrategy, Component, Input } from '@angular/core'; |
|||
import { ApiUrlConfig, BackupDto, BackupsState, ConfirmClickDirective, Duration, ExternalLinkDirective, FromNowPipe, KNumberPipe, StatusIconComponent, TooltipDirective, TranslatePipe, TypedSimpleChanges } from '@app/shared'; |
|||
|
|||
@Component({ |
|||
standalone: true, |
|||
selector: 'sqx-backup', |
|||
styleUrls: ['./backup.component.scss'], |
|||
templateUrl: './backup.component.html', |
|||
changeDetection: ChangeDetectionStrategy.OnPush, |
|||
imports: [ |
|||
ConfirmClickDirective, |
|||
ExternalLinkDirective, |
|||
FromNowPipe, |
|||
KNumberPipe, |
|||
NgIf, |
|||
NgSwitch, |
|||
StatusIconComponent, |
|||
TooltipDirective, |
|||
TranslatePipe, |
|||
], |
|||
}) |
|||
export class BackupComponent { |
|||
@Input({ required: true }) |
|||
public backup!: BackupDto; |
|||
|
|||
public duration = ''; |
|||
|
|||
constructor( |
|||
public readonly apiUrl: ApiUrlConfig, |
|||
private readonly backupsState: BackupsState, |
|||
) { |
|||
} |
|||
|
|||
public ngOnChanges(changes: TypedSimpleChanges<this>) { |
|||
if (changes.backup) { |
|||
this.duration = Duration.create(this.backup.started, this.backup.stopped!).toString(); |
|||
} |
|||
} |
|||
|
|||
public delete() { |
|||
this.backupsState.delete(this.backup); |
|||
} |
|||
} |
|||
@ -1,51 +0,0 @@ |
|||
<sqx-title message="i18n:common.backups"></sqx-title> |
|||
|
|||
<sqx-layout layout="main" titleText="i18n:common.backups" titleIcon="backups" innerWidth="50"> |
|||
<ng-container menu> |
|||
<button type="button" class="btn btn-text-secondary me-2" (click)="reload()" title="i18n:backups.refreshTooltip" shortcut="CTRL + B"> |
|||
<i class="icon-reset"></i> {{ 'common.refresh' | sqxTranslate }} |
|||
</button> |
|||
|
|||
<button type="button" class="btn btn-success" [disabled]="backupsState.maxBackupsReached | async" *ngIf="backupsState.canCreate | async" (click)="start()"> |
|||
{{ 'backups.start' | sqxTranslate }} |
|||
</button> |
|||
</ng-container> |
|||
|
|||
<ng-container> |
|||
<sqx-list-view innerWidth="50rem" [isLoading]="backupsState.isLoading | async"> |
|||
<div class="alert alert-danger mb-4" *ngIf="backupsState.maxBackupsReached | async"> |
|||
{{ 'backups.maximumReached' | sqxTranslate }} |
|||
</div> |
|||
|
|||
<ng-container *ngIf="(backupsState.isLoaded | async) && (backupsState.backups | async); let backups"> |
|||
<div class="table-items-row table-items-row-summary table-items-row-empty" *ngIf="backups.length === 0"> |
|||
{{ 'backups.empty' | sqxTranslate }} |
|||
|
|||
<button type="button" class="btn btn-success btn-sm me-2" (click)="start()" *ngIf="backupsState.canCreate | async"> |
|||
{{ 'backups.start' | sqxTranslate }} |
|||
</button> |
|||
</div> |
|||
|
|||
<sqx-backup *ngFor="let backup of backups; trackBy: trackByBackup" |
|||
[backup]="backup"> |
|||
</sqx-backup> |
|||
</ng-container> |
|||
</sqx-list-view> |
|||
</ng-container> |
|||
|
|||
<ng-template sidebarMenu> |
|||
<div class="panel-nav"> |
|||
<a class="panel-link" |
|||
routerLink="help" |
|||
routerLinkActive="active" |
|||
queryParamsHandling="preserve" |
|||
title="i18n:common.help" |
|||
titlePosition="left" |
|||
sqxTourStep="help"> |
|||
<i class="icon-help2"></i> |
|||
</a> |
|||
</div> |
|||
</ng-template> |
|||
</sqx-layout> |
|||
|
|||
<router-outlet></router-outlet> |
|||
@ -1,2 +0,0 @@ |
|||
@import 'mixins'; |
|||
@import 'vars'; |
|||
@ -0,0 +1,60 @@ |
|||
<div class="table-items-row table-items-row-expandable"> |
|||
<div class="table-items-row-summary"> |
|||
<div class="row align-items-center"> |
|||
<div class="col-auto pe-4" [ngSwitch]="job.status"> |
|||
<sqx-status-icon size="lg" [status]="job.status"></sqx-status-icon> |
|||
</div> |
|||
<div class="col"> |
|||
<div> |
|||
<h4>{{job.description}}</h4> |
|||
</div> |
|||
|
|||
<div class="row"> |
|||
<div class="col"> |
|||
{{ 'jobs.startedLabel' | sqxTranslate }}: |
|||
|
|||
<span> |
|||
{{job.started | sqxFromNow}} |
|||
</span> |
|||
</div> |
|||
<div class="col"> |
|||
{{ 'jobs.jobDuration' | sqxTranslate }}: |
|||
|
|||
<span *ngIf="job.stopped"> |
|||
{{duration}} |
|||
</span> |
|||
</div> |
|||
</div> |
|||
</div> |
|||
<div class="col-options text-right"> |
|||
<a class="btn btn-text-secondary" href="{{apiUrl.buildUrl(job.downloadUrl || '')}}" [class.invisible]="!job.downloadUrl" sqxExternalLink="noicon"> |
|||
<i class="icon-download"></i> |
|||
</a> |
|||
|
|||
<button type="button" class="btn btn-outline-secondary btn-expand ms-1" [class.expanded]="expanded" (click)="toggleExpanded()"> |
|||
<span class="hidden">{{ 'common.settings' | sqxTranslate }}</span> |
|||
<i class="icon-settings"></i> |
|||
</button> |
|||
|
|||
<button type="button" class="btn btn-text-danger ms-1" [disabled]="!job.canDelete" |
|||
(sqxConfirmClick)="delete()" |
|||
confirmTitle="i18n:jobs.deleteConfirmTitle" |
|||
confirmText="i18n:jobs.deleteConfirmText" |
|||
confirmRememberKey="deleteBackup"> |
|||
<i class="icon-bin2"></i> |
|||
</button> |
|||
</div> |
|||
</div> |
|||
</div> |
|||
|
|||
<div class="table-items-row-details" *ngIf="expanded"> |
|||
<div class="job-header"> |
|||
<h4>{{ 'common.details' | sqxTranslate }}</h4> |
|||
</div> |
|||
<div class="row job-dump"> |
|||
<div class="col-12"> |
|||
<sqx-code-editor [ngModel]="details" disabled="true" wordWrap="true" height="auto" mode="ace/mode/text"></sqx-code-editor> |
|||
</div> |
|||
</div> |
|||
</div> |
|||
</div> |
|||
@ -0,0 +1,32 @@ |
|||
@import 'mixins'; |
|||
@import 'vars'; |
|||
|
|||
.col { |
|||
&-options { |
|||
max-width: 160px; |
|||
} |
|||
} |
|||
|
|||
.job { |
|||
&-header, |
|||
&-dump { |
|||
padding: .75rem 1.25rem; |
|||
} |
|||
|
|||
&-header { |
|||
background: $color-border-light; |
|||
border: 0; |
|||
border-bottom: 2px solid $color-border; |
|||
position: relative; |
|||
|
|||
h4 { |
|||
font-size: 1rem; |
|||
font-weight: 500; |
|||
margin: 0; |
|||
} |
|||
} |
|||
} |
|||
|
|||
.btn-expand.expanded::before { |
|||
bottom: -1.35rem; |
|||
} |
|||
@ -0,0 +1,73 @@ |
|||
/* |
|||
* Squidex Headless CMS |
|||
* |
|||
* @license |
|||
* Copyright (c) Squidex UG (haftungsbeschränkt). All rights reserved. |
|||
*/ |
|||
|
|||
import { NgIf, NgSwitch } from '@angular/common'; |
|||
import { ChangeDetectionStrategy, Component, Input } from '@angular/core'; |
|||
import { FormsModule } from '@angular/forms'; |
|||
import { ApiUrlConfig, CodeEditorComponent, ConfirmClickDirective, Duration, ExternalLinkDirective, FromNowPipe, JobDto, JobsState, KNumberPipe, StatusIconComponent, TooltipDirective, TranslatePipe, TypedSimpleChanges } from '@app/shared'; |
|||
|
|||
@Component({ |
|||
standalone: true, |
|||
selector: 'sqx-job', |
|||
styleUrls: ['./job.component.scss'], |
|||
templateUrl: './job.component.html', |
|||
changeDetection: ChangeDetectionStrategy.OnPush, |
|||
imports: [ |
|||
CodeEditorComponent, |
|||
ConfirmClickDirective, |
|||
ExternalLinkDirective, |
|||
FormsModule, |
|||
FromNowPipe, |
|||
KNumberPipe, |
|||
NgIf, |
|||
NgSwitch, |
|||
StatusIconComponent, |
|||
TooltipDirective, |
|||
TranslatePipe, |
|||
], |
|||
}) |
|||
export class JobComponent { |
|||
@Input({ required: true }) |
|||
public job!: JobDto; |
|||
|
|||
public duration = ''; |
|||
public details = ''; |
|||
|
|||
public expanded = false; |
|||
|
|||
constructor( |
|||
public readonly apiUrl: ApiUrlConfig, |
|||
private readonly jobsState: JobsState, |
|||
) { |
|||
} |
|||
|
|||
public ngOnChanges(changes: TypedSimpleChanges<this>) { |
|||
if (changes.job) { |
|||
this.duration = Duration.create(this.job.started, this.job.stopped!).toString(); |
|||
|
|||
this.details = ''; |
|||
this.details += 'Arguments:\n'; |
|||
this.details += JSON.stringify(this.job.taskArguments, undefined, 2); |
|||
|
|||
if (this.job.log.length > 0) { |
|||
this.details += '\n\nLog:'; |
|||
|
|||
for (const log of this.job.log) { |
|||
this.details += `\n${log.timestamp.toISODateUTC()} ${log.message}`; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
public delete() { |
|||
this.jobsState.delete(this.job); |
|||
} |
|||
|
|||
public toggleExpanded() { |
|||
this.expanded = !this.expanded; |
|||
} |
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
<sqx-title message="i18n:common.jobsBackups"></sqx-title> |
|||
|
|||
<sqx-layout layout="main" titleText="i18n:common.jobsBackups" titleIcon="jobs" innerWidth="50"> |
|||
<ng-container menu> |
|||
<button type="button" class="btn btn-text-secondary me-2" (click)="reload()" title="i18n:jobs.refreshTooltip" shortcut="CTRL + B"> |
|||
<i class="icon-reset"></i> {{ 'common.refresh' | sqxTranslate }} |
|||
</button> |
|||
|
|||
<button type="button" class="btn btn-success" [disabled]="jobsState.maxBackupsReached | async" *ngIf="jobsState.canCreateBackup | async" (click)="startBackup()"> |
|||
{{ 'jobs.backupStart' | sqxTranslate }} |
|||
</button> |
|||
</ng-container> |
|||
|
|||
<ng-container> |
|||
<sqx-list-view innerWidth="50rem" [isLoading]="jobsState.isLoading | async"> |
|||
<div class="alert alert-danger mb-4" *ngIf="jobsState.maxBackupsReached | async"> |
|||
{{ 'jobs.backupMaximumReached' | sqxTranslate }} |
|||
</div> |
|||
|
|||
<ng-container *ngIf="(jobsState.isLoaded | async) && (jobsState.jobs | async); let jobs"> |
|||
<div class="table-items-row table-items-row-summary table-items-row-empty" *ngIf="jobs.length === 0"> |
|||
{{ 'jobs.empty' | sqxTranslate }} |
|||
</div> |
|||
|
|||
<sqx-job *ngFor="let job of jobs; trackBy: trackByJob" |
|||
[job]="job"> |
|||
</sqx-job> |
|||
</ng-container> |
|||
</sqx-list-view> |
|||
</ng-container> |
|||
|
|||
<ng-template sidebarMenu> |
|||
<div class="panel-nav"> |
|||
<a class="panel-link" |
|||
routerLink="help" |
|||
routerLinkActive="active" |
|||
queryParamsHandling="preserve" |
|||
title="i18n:common.help" |
|||
titlePosition="left" |
|||
sqxTourStep="help"> |
|||
<i class="icon-help2"></i> |
|||
</a> |
|||
</div> |
|||
</ng-template> |
|||
</sqx-layout> |
|||
|
|||
<router-outlet></router-outlet> |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue