From e8aea1acdc93ca64e6256c90a6115c8dd608a403 Mon Sep 17 00:00:00 2001 From: cKey <35512826+colinin@users.noreply.github.com> Date: Mon, 20 Mar 2023 11:08:16 +0800 Subject: [PATCH 1/2] feat(tasks): add status Queuing --- .../api/task-management/model/backgroundJobInfoModel.ts | 1 + .../background-jobs/components/JobModal.vue | 4 ++-- .../task-management/background-jobs/datas/ModalData.ts | 2 ++ .../views/task-management/background-jobs/datas/typing.ts | 2 ++ .../LINGYUN/Abp/BackgroundTasks/JobStatus.cs | 4 ++++ .../Abp/BackgroundTasks/Quartz/QuartzJobListener.cs | 6 +++++- .../BackgroundTasks/Quartz/QuartzJobSearchJobAdapter.cs | 2 +- .../Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs | 1 + .../Quartz/IJobExecutionContextExtensions.cs | 7 +++++++ .../LINGYUN/Abp/BackgroundTasks/BackgroundWorkerAdapter.cs | 1 - .../Abp/TaskManagement/Localization/Resources/en.json | 1 + .../Abp/TaskManagement/Localization/Resources/zh-Hans.json | 1 + .../LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs | 2 +- .../BackgroundJobInfoWaitingPeriodSpecification.cs | 2 +- .../BackgroundJobInfoWaitingSpecification.cs | 2 +- .../LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs | 4 ++-- 16 files changed, 32 insertions(+), 10 deletions(-) diff --git a/apps/vue/src/api/task-management/model/backgroundJobInfoModel.ts b/apps/vue/src/api/task-management/model/backgroundJobInfoModel.ts index 4a2b95ac0..50e1d0f15 100644 --- a/apps/vue/src/api/task-management/model/backgroundJobInfoModel.ts +++ b/apps/vue/src/api/task-management/model/backgroundJobInfoModel.ts @@ -1,6 +1,7 @@ export enum JobStatus { None = -1, Completed = 0, + Queuing = 5, Running = 10, FailedRetry = 15, Paused = 20, diff --git a/apps/vue/src/views/task-management/background-jobs/components/JobModal.vue b/apps/vue/src/views/task-management/background-jobs/components/JobModal.vue index 3a1bbaa0d..646af0d1f 100644 --- a/apps/vue/src/views/task-management/background-jobs/components/JobModal.vue +++ b/apps/vue/src/views/task-management/background-jobs/components/JobModal.vue @@ -54,14 +54,14 @@ { componentProps: { options: [ { label: JobStatusMap[JobStatus.None], value: JobStatus.None }, + { label: JobStatusMap[JobStatus.Queuing], value: JobStatus.Queuing }, { label: JobStatusMap[JobStatus.Running], value: JobStatus.Running }, { label: JobStatusMap[JobStatus.Completed], value: JobStatus.Completed }, + { label: JobStatusMap[JobStatus.FailedRetry], value: JobStatus.FailedRetry }, { label: JobStatusMap[JobStatus.Paused], value: JobStatus.Paused }, { label: JobStatusMap[JobStatus.Stopped], value: JobStatus.Stopped }, ], diff --git a/apps/vue/src/views/task-management/background-jobs/datas/typing.ts b/apps/vue/src/views/task-management/background-jobs/datas/typing.ts index 2da2ba9da..515ed6df4 100644 --- a/apps/vue/src/views/task-management/background-jobs/datas/typing.ts +++ b/apps/vue/src/views/task-management/background-jobs/datas/typing.ts @@ -6,6 +6,7 @@ const { L } = useLocalization('TaskManagement'); export const JobStatusMap = { [JobStatus.None]: L('DisplayName:None'), [JobStatus.Completed]: L('DisplayName:Completed'), + [JobStatus.Queuing]: L('DisplayName:Queuing'), [JobStatus.Running]: L('DisplayName:Running'), [JobStatus.FailedRetry]: L('DisplayName:FailedRetry'), [JobStatus.Paused]: L('DisplayName:Paused'), @@ -14,6 +15,7 @@ export const JobStatusMap = { export const JobStatusColor = { [JobStatus.None]: '', [JobStatus.Completed]: '#339933', + [JobStatus.Queuing]: '#008B8B', [JobStatus.Running]: '#3399CC', [JobStatus.FailedRetry]: '#FF6600', [JobStatus.Paused]: '#CC6633', diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobStatus.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobStatus.cs index 405f7157a..4be36d824 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobStatus.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobStatus.cs @@ -11,6 +11,10 @@ public enum JobStatus /// Completed = 0, /// + /// 队列中 + /// + Queuing = 5, + /// /// 运行中 /// Running = 10, diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs index 33dd38cf0..540519d19 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs @@ -40,7 +40,11 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency jobName = !jobType.IsGenericType ? jobType.Name : jobType.GetGenericArguments()[0].Name; } - Logger.LogWarning($"The task {jobName} could not be performed..."); + // 作业被锁定才记录warn事件 + if (context.TryGetCache("JobLocked", out var time) && time != null && int.TryParse(time.ToString(), out var lockTime)) + { + Logger.LogWarning($"The task {jobName} could not be performed, Because it is being scheduled by another job scheduler"); + } return Task.FromResult(-1); } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobSearchJobAdapter.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobSearchJobAdapter.cs index ff72c7d24..c0ea3720a 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobSearchJobAdapter.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobSearchJobAdapter.cs @@ -30,7 +30,7 @@ public class QuartzJobSearchJobAdapter : IJob jobDefinition.JobType, scope.ServiceProvider, context.MergedJobDataMap.ToImmutableDictionary(), - getCache: (key) => context.Get(key), + getCache: context.Get, setCache: context.Put, cancellationToken: context.CancellationToken); diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs index 177231fe3..e6cd29b9d 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs @@ -49,6 +49,7 @@ public class QuartzTriggerListener : TriggerListenerSupport, ISingletonDependenc // 传递令牌将清除本次锁, 那并无意义 if (!await JobLockProvider.TryLockAsync(NormalizeKey(context, jobId), time)) { + context.Put("JobLocked", time); Logger.LogDebug("The exclusive job is already in use by another scheduler. Ignore this schedule."); return true; } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/Quartz/IJobExecutionContextExtensions.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/Quartz/IJobExecutionContextExtensions.cs index 8772ba717..cfcdf2be3 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/Quartz/IJobExecutionContextExtensions.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/Quartz/IJobExecutionContextExtensions.cs @@ -42,4 +42,11 @@ public static class IJobExecutionContextExtensions } return false; } + + public static bool TryGetCache(this IJobExecutionContext context, string key, out object value) + { + value = context.Get(key); + + return value != null; + } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/BackgroundWorkerAdapter.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/BackgroundWorkerAdapter.cs index ef47712e8..b6ddcf4db 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/BackgroundWorkerAdapter.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/BackgroundWorkerAdapter.cs @@ -75,7 +75,6 @@ public class BackgroundWorkerAdapter : BackgroundWorkerBase, IBackgroun JobType = JobType.Persistent, Interval = period.Value / 1000, MaxCount = 0, - // TODO: 可配置 MaxTryCount = 10, // 确保不会被轮询入队 Status = JobStatus.None, diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/en.json b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/en.json index 3743ad687..1fc9ec0d9 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/en.json +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/en.json @@ -57,6 +57,7 @@ "DisplayName:None": "None", "DisplayName:Completed": "Completed", "DisplayName:Running": "Running", + "DisplayName:Queuing": "Queuing", "DisplayName:Paused": "Paused", "DisplayName:FailedRetry": "Failed Retry", "DisplayName:Stopped": "Stopped", diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/zh-Hans.json b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/zh-Hans.json index fc39294f0..0e92063df 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/zh-Hans.json +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/zh-Hans.json @@ -57,6 +57,7 @@ "DisplayName:None": "未定义", "DisplayName:Completed": "已完成", "DisplayName:Running": "运行中", + "DisplayName:Queuing": "队列中", "DisplayName:Paused": "已暂停", "DisplayName:FailedRetry": "失败重试", "DisplayName:Stopped": "已停止", diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs index 6c7342c94..871720cc0 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs @@ -141,7 +141,7 @@ public class BackgroundJobInfo : AuditedAggregateRoot, IMultiTenant NodeName = Check.Length(nodeName, nameof(nodeName), BackgroundJobInfoConsts.MaxNodeNameLength); TenantId = tenantId; - Status = JobStatus.Running; + Status = JobStatus.Queuing; // TODO: 是否需要将参数挪到另一个实体? // 任务参数的建议是尽量最小化, 仅存储关键信息 diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoWaitingPeriodSpecification.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoWaitingPeriodSpecification.cs index 2a5ef8ca8..7e2b3572b 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoWaitingPeriodSpecification.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoWaitingPeriodSpecification.cs @@ -8,7 +8,7 @@ public class BackgroundJobInfoWaitingPeriodSpecification : BackgroundJobInfoWait { public override Expression> ToExpression() { - var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry }; + var status = new JobStatus[] { JobStatus.Queuing, JobStatus.FailedRetry }; Expression> expression = _ => true; return expression diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoWaitingSpecification.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoWaitingSpecification.cs index e5c3695f3..5a88807d5 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoWaitingSpecification.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoWaitingSpecification.cs @@ -9,7 +9,7 @@ public class BackgroundJobInfoWaitingSpecification : Volo.Abp.Specifications.Spe { public override Expression> ToExpression() { - var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry }; + var status = new JobStatus[] { JobStatus.Queuing, JobStatus.FailedRetry }; Expression> expression = _ => true; diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs index 6d80257f2..ff0265705 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs @@ -170,7 +170,7 @@ public class BackgroundJobManager : DomainService public async virtual Task ResumeAsync(BackgroundJobInfo jobInfo) { - jobInfo.SetStatus(JobStatus.Running); + jobInfo.SetStatus(JobStatus.Queuing); jobInfo.IsAbandoned = false; jobInfo.IsEnabled = true; @@ -192,7 +192,7 @@ public class BackgroundJobManager : DomainService { foreach (var jobInfo in jobInfos) { - jobInfo.SetStatus(JobStatus.Running); + jobInfo.SetStatus(JobStatus.Queuing); jobInfo.IsAbandoned = false; jobInfo.IsEnabled = true; } From 2a141e40d4c6e0166e470d59bb59163985094565 Mon Sep 17 00:00:00 2001 From: cKey <35512826+colinin@users.noreply.github.com> Date: Mon, 20 Mar 2023 11:40:47 +0800 Subject: [PATCH 2/2] feat(tasks): add job BackgroundCheckingJob. --- .../AbpBackgroundTasksOptions.cs | 31 +++++++ .../DisableJobActionAttribute.cs | 4 +- .../DisableJobStatusAttribute.cs | 10 +++ .../LINGYUN/Abp/BackgroundTasks/IJobStore.cs | 4 + .../Internal/BackgroundCheckingJob.cs | 57 ++++++++++++ .../Internal/DefaultBackgroundWorker.cs | 34 +++++++ .../Internal/InMemoryJobStore.cs | 89 ++++++++++++------- .../Internal/JobExecutedEvent.cs | 4 + .../Abp/TaskManagement/BackgroundJobStore.cs | 14 +++ .../EventBus/Handlers/TenantSynchronizer.cs | 30 +++++++ 10 files changed, 246 insertions(+), 31 deletions(-) create mode 100644 aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobStatusAttribute.cs create mode 100644 aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs index 8f69bde53..313b68bd6 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs @@ -85,6 +85,32 @@ public class AbpBackgroundTasksOptions /// 轮询任务也属于一个后台任务, 需要对每一次轮询加锁,防止重复任务入库 /// public int JobFetchLockTimeOut { get; set; } + + /// + /// 启用检查任务 + /// + /// + /// 主节点启用 + /// + public bool JobCheckEnabled { get; set; } + /// + /// 每次检查任务批次大小 + /// 默认: 100 + /// + public int MaxJobCheckCount { get; set; } + /// + /// 检查任务批次Cron表达式 + /// 默认: 每2小时执行一次(0 0 0/2 * * ? ) + /// + /// + /// Cron表达式 + /// + public string JobCheckCronExpression { get; set; } + /// + /// 检查任务批次时锁定任务超时时长(秒) + /// 默认:120 + /// + public int JobCheckLockTimeOut { get; set; } /// /// 指定运行节点 /// @@ -96,6 +122,11 @@ public class AbpBackgroundTasksOptions JobFetchLockTimeOut = 120; JobFetchCronExpression = "0/30 * * * * ? "; + JobCheckEnabled = false; + MaxJobCheckCount = 100; + JobCheckLockTimeOut = 120; + JobCheckCronExpression = "0 0 0/2 * * ? "; + JobCleanEnabled = false; MaxJobCleanCount = 1000; JobExpiratime = TimeSpan.FromDays(15d); diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobActionAttribute.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobActionAttribute.cs index e9be68be0..88f56bf6c 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobActionAttribute.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobActionAttribute.cs @@ -1,7 +1,9 @@ using System; namespace LINGYUN.Abp.BackgroundTasks; - +/// +/// 禁用作业调度行为 +/// [AttributeUsage(AttributeTargets.Class)] public class DisableJobActionAttribute : Attribute { diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobStatusAttribute.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobStatusAttribute.cs new file mode 100644 index 000000000..1d213c2bb --- /dev/null +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobStatusAttribute.cs @@ -0,0 +1,10 @@ +using System; + +namespace LINGYUN.Abp.BackgroundTasks; +/// +/// 禁用作业调度状态 +/// +[AttributeUsage(AttributeTargets.Class)] +public class DisableJobStatusAttribute : Attribute +{ +} diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs index 9d408f046..95fa9d6ae 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs @@ -7,6 +7,10 @@ namespace LINGYUN.Abp.BackgroundTasks; public interface IJobStore { + Task> GetRuningListAsync( + int maxResultCount, + CancellationToken cancellationToken = default); + Task> GetWaitingListAsync( int maxResultCount, CancellationToken cancellationToken = default); diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs new file mode 100644 index 000000000..bfb9eb553 --- /dev/null +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs @@ -0,0 +1,57 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Linq; +using System.Threading.Tasks; +using Volo.Abp.Auditing; +using Volo.Abp.MultiTenancy; + +namespace LINGYUN.Abp.BackgroundTasks.Internal; + +[DisableAuditing] +[DisableJobAction] +public class BackgroundCheckingJob : IJobRunnable +{ + public async virtual Task ExecuteAsync(JobRunnableContext context) + { + try + { + var options = context.ServiceProvider.GetRequiredService>().Value; + var store = context.ServiceProvider.GetRequiredService(); + var currentTenant = context.ServiceProvider.GetRequiredService(); + + context.TryGetMultiTenantId(out var tenantId); + + using (currentTenant.Change(tenantId)) + { + var runingTasks = await store.GetRuningListAsync( + options.MaxJobCheckCount, context.CancellationToken); + + if (!runingTasks.Any()) + { + return; + } + + var jobScheduler = context.ServiceProvider.GetRequiredService(); + + foreach (var job in runingTasks) + { + // 当标记为运行中的作业不在调度器中时,改变为已停止作业 + if (!await jobScheduler.ExistsAsync(job, context.CancellationToken)) + { + job.Status = JobStatus.Stopped; + + await store.StoreAsync(job); + } + } + } + } + catch(Exception ex) + { + context.ServiceProvider + .GetService>() + ?.LogError(ex, "check job status error."); + } + } +} diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs index b38030783..61c00ad36 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs @@ -27,6 +27,7 @@ internal class DefaultBackgroundWorker : BackgroundService { await QueuePollingJob(); await QueueCleaningJob(); + await QueueCheckingJob(); } private async Task QueuePollingJob() @@ -47,6 +48,17 @@ internal class DefaultBackgroundWorker : BackgroundService } } + private async Task QueueCheckingJob() + { + if (_options.JobCheckEnabled) + { + var checkingJob = BuildCheckingJobInfo(); + await _jobPublisher.PublishAsync(checkingJob); + } + } + + + private JobInfo BuildPollingJobInfo() { return new JobInfo @@ -89,4 +101,26 @@ internal class DefaultBackgroundWorker : BackgroundService Type = typeof(BackgroundCleaningJob).AssemblyQualifiedName, }; } + + private JobInfo BuildCheckingJobInfo() + { + return new JobInfo + { + Id = "Checking", + Name = nameof(BackgroundCheckingJob), + Group = "Checking", + Description = "Checking tasks to be executed", + Args = new Dictionary(), + Status = JobStatus.Running, + BeginTime = DateTime.Now, + CreationTime = DateTime.Now, + Cron = _options.JobCheckCronExpression, + LockTimeOut = _options.JobCheckLockTimeOut, + JobType = JobType.Period, + Priority = JobPriority.High, + Source = JobSource.System, + NodeName = _options.NodeName, + Type = typeof(BackgroundCheckingJob).AssemblyQualifiedName, + }; + } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs index 69d292b8c..47769a2c7 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs @@ -11,17 +11,18 @@ namespace LINGYUN.Abp.BackgroundTasks.Internal; internal class InMemoryJobStore : IJobStore, ISingletonDependency { private readonly List _memoryJobStore; + private readonly static object _jobSync = new(); public InMemoryJobStore() { _memoryJobStore = new List(); } - public Task> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default) + public virtual Task> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); - var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry }; + var status = new JobStatus[] { JobStatus.Queuing, JobStatus.FailedRetry }; var jobs = _memoryJobStore .Where(x => !x.IsAbandoned) @@ -33,12 +34,26 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency return Task.FromResult(jobs); } - public Task> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default) + public virtual Task> GetRuningListAsync(int maxResultCount, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + var status = new JobStatus[] { JobStatus.Running }; + + var jobs = _memoryJobStore + .Where(x => status.Contains(x.Status)) + .Take(maxResultCount) + .ToList(); + + return Task.FromResult(jobs); + } + + public virtual Task> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); var now = DateTime.Now; - var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry }; + var status = new JobStatus[] { JobStatus.Queuing, JobStatus.FailedRetry }; var jobs = _memoryJobStore .Where(x => !x.IsAbandoned) @@ -63,62 +78,76 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency return Task.FromResult(job); } - public Task StoreAsync( + public virtual Task StoreAsync( JobInfo jobInfo, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); - var job = _memoryJobStore.FirstOrDefault(x => x.Id.Equals(jobInfo.Id)); - if (job != null) + lock(_jobSync) { - job.NextRunTime = jobInfo.NextRunTime; - job.LastRunTime = jobInfo.LastRunTime; - job.Status = jobInfo.Status; - job.TriggerCount = jobInfo.TriggerCount; - job.TryCount = jobInfo.TryCount; - job.IsAbandoned = jobInfo.IsAbandoned; - } - else - { - _memoryJobStore.Add(jobInfo); + var job = _memoryJobStore.FirstOrDefault(x => x.Id.Equals(jobInfo.Id)); + if (job != null) + { + job.NextRunTime = jobInfo.NextRunTime; + job.LastRunTime = jobInfo.LastRunTime; + job.Status = jobInfo.Status; + job.TriggerCount = jobInfo.TriggerCount; + job.TryCount = jobInfo.TryCount; + job.IsAbandoned = jobInfo.IsAbandoned; + } + else + { + _memoryJobStore.Add(jobInfo); + } } + return Task.CompletedTask; } - public async Task RemoveAsync( + public virtual Task RemoveAsync( string jobId, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); - var job = await FindAsync(jobId, cancellationToken); - if (job != null) + lock (_jobSync) { - _memoryJobStore.Remove(job); + var job = _memoryJobStore.FirstOrDefault(x => x.Id.Equals(jobId)); + if (job != null) + { + _memoryJobStore.Remove(job); + } } + + return Task.CompletedTask; } - public Task StoreLogAsync(JobEventData eventData) + public virtual Task StoreLogAsync(JobEventData eventData) { eventData.CancellationToken.ThrowIfCancellationRequested(); return Task.CompletedTask; } - public Task> CleanupAsync(int maxResultCount, TimeSpan jobExpiratime, CancellationToken cancellationToken = default) + public virtual Task> CleanupAsync(int maxResultCount, TimeSpan jobExpiratime, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); - var expiratime = DateTime.Now - jobExpiratime; + var expriaJobs = new List(); - var expriaJobs = _memoryJobStore - .Where(x => x.Status == JobStatus.Completed && - expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0) - .Take(maxResultCount) - .ToList(); + lock (_jobSync) + { + var expiratime = DateTime.Now - jobExpiratime; - _memoryJobStore.RemoveAll(expriaJobs); + expriaJobs = _memoryJobStore + .Where(x => x.Status == JobStatus.Completed && + expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0) + .Take(maxResultCount) + .ToList(); + + _memoryJobStore.RemoveAll(expriaJobs); + } return Task.FromResult(expriaJobs); } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs index 9f7837a02..14dd63627 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs @@ -10,6 +10,10 @@ public class JobExecutedEvent : JobEventBase, ITransientDepend { protected override async Task OnJobAfterExecutedAsync(JobEventContext context) { + if (context.EventData.Type.IsDefined(typeof(DisableJobStatusAttribute), true)) + { + return; + } var store = context.ServiceProvider.GetRequiredService(); var job = await store.FindAsync(context.EventData.Key, context.EventData.CancellationToken); if (job != null) diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs index b9001ad5f..df1e7ac23 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs @@ -40,6 +40,20 @@ public class BackgroundJobStore : IJobStore, ITransientDependency return ObjectMapper.Map, List>(jobInfos); } + public async virtual Task> GetRuningListAsync(int maxResultCount, CancellationToken cancellationToken = default) + { + var filter = new BackgroundJobInfoFilter + { + Status = JobStatus.Running + }; + var specification = new BackgroundJobInfoSpecification(filter); + + var jobInfos = await JobInfoRepository.GetListAsync( + specification, maxResultCount: maxResultCount, cancellationToken: cancellationToken); + + return ObjectMapper.Map, List>(jobInfos); + } + public async virtual Task> GetWaitingListAsync( int maxResultCount, CancellationToken cancellationToken = default) diff --git a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/EventBus/Handlers/TenantSynchronizer.cs b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/EventBus/Handlers/TenantSynchronizer.cs index a59768ca4..d890c70ba 100644 --- a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/EventBus/Handlers/TenantSynchronizer.cs +++ b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/EventBus/Handlers/TenantSynchronizer.cs @@ -60,6 +60,10 @@ namespace LY.MicroService.TaskManagement.EventBus.Handlers var cleaningJob = BuildCleaningJobInfo(eventData.Entity.Id, eventData.Entity.Name); await JobScheduler.RemoveAsync(cleaningJob); await JobStore.RemoveAsync(cleaningJob.Id); + + var checkingJob = BuildCheckingJobInfo(eventData.Entity.Id, eventData.Entity.Name); + await JobScheduler.RemoveAsync(checkingJob); + await JobStore.RemoveAsync(checkingJob.Id); } public async Task HandleEventAsync(CreateEventData eventData) @@ -79,6 +83,10 @@ namespace LY.MicroService.TaskManagement.EventBus.Handlers var cleaningJob = BuildCleaningJobInfo(eventData.Id, eventData.Name); await JobStore.StoreAsync(cleaningJob); await JobScheduler.QueueAsync(cleaningJob); + + var checkingJob = BuildCheckingJobInfo(eventData.Id, eventData.Name); + await JobStore.StoreAsync(checkingJob); + await JobScheduler.QueueAsync(checkingJob); } private async Task MigrateAsync(CreateEventData eventData) @@ -145,5 +153,27 @@ namespace LY.MicroService.TaskManagement.EventBus.Handlers Type = typeof(BackgroundCleaningJob).AssemblyQualifiedName, }; } + + private JobInfo BuildCheckingJobInfo(Guid tenantId, string tenantName) + { + return new JobInfo + { + Id = tenantId.ToString() + "_Checking", + Name = nameof(BackgroundCheckingJob), + Group = "Checking", + Description = "Checking tasks to be executed", + Args = new Dictionary() { { nameof(JobInfo.TenantId), tenantId } }, + Status = JobStatus.Running, + BeginTime = DateTime.Now, + CreationTime = DateTime.Now, + Cron = Options.JobCheckCronExpression, + LockTimeOut = Options.JobCheckLockTimeOut, + JobType = JobType.Period, + Priority = JobPriority.High, + Source = JobSource.System, + TenantId = tenantId, + Type = typeof(BackgroundCheckingJob).AssemblyQualifiedName, + }; + } } }