From 2a141e40d4c6e0166e470d59bb59163985094565 Mon Sep 17 00:00:00 2001
From: cKey <35512826+colinin@users.noreply.github.com>
Date: Mon, 20 Mar 2023 11:40:47 +0800
Subject: [PATCH] feat(tasks): add job BackgroundCheckingJob.
---
.../AbpBackgroundTasksOptions.cs | 31 +++++++
.../DisableJobActionAttribute.cs | 4 +-
.../DisableJobStatusAttribute.cs | 10 +++
.../LINGYUN/Abp/BackgroundTasks/IJobStore.cs | 4 +
.../Internal/BackgroundCheckingJob.cs | 57 ++++++++++++
.../Internal/DefaultBackgroundWorker.cs | 34 +++++++
.../Internal/InMemoryJobStore.cs | 89 ++++++++++++-------
.../Internal/JobExecutedEvent.cs | 4 +
.../Abp/TaskManagement/BackgroundJobStore.cs | 14 +++
.../EventBus/Handlers/TenantSynchronizer.cs | 30 +++++++
10 files changed, 246 insertions(+), 31 deletions(-)
create mode 100644 aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobStatusAttribute.cs
create mode 100644 aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs
index 8f69bde53..313b68bd6 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs
@@ -85,6 +85,32 @@ public class AbpBackgroundTasksOptions
/// 轮询任务也属于一个后台任务, 需要对每一次轮询加锁,防止重复任务入库
///
public int JobFetchLockTimeOut { get; set; }
+
+ ///
+ /// 启用检查任务
+ ///
+ ///
+ /// 主节点启用
+ ///
+ public bool JobCheckEnabled { get; set; }
+ ///
+ /// 每次检查任务批次大小
+ /// 默认: 100
+ ///
+ public int MaxJobCheckCount { get; set; }
+ ///
+ /// 检查任务批次Cron表达式
+ /// 默认: 每2小时执行一次(0 0 0/2 * * ? )
+ ///
+ ///
+ /// Cron表达式
+ ///
+ public string JobCheckCronExpression { get; set; }
+ ///
+ /// 检查任务批次时锁定任务超时时长(秒)
+ /// 默认:120
+ ///
+ public int JobCheckLockTimeOut { get; set; }
///
/// 指定运行节点
///
@@ -96,6 +122,11 @@ public class AbpBackgroundTasksOptions
JobFetchLockTimeOut = 120;
JobFetchCronExpression = "0/30 * * * * ? ";
+ JobCheckEnabled = false;
+ MaxJobCheckCount = 100;
+ JobCheckLockTimeOut = 120;
+ JobCheckCronExpression = "0 0 0/2 * * ? ";
+
JobCleanEnabled = false;
MaxJobCleanCount = 1000;
JobExpiratime = TimeSpan.FromDays(15d);
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobActionAttribute.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobActionAttribute.cs
index e9be68be0..88f56bf6c 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobActionAttribute.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobActionAttribute.cs
@@ -1,7 +1,9 @@
using System;
namespace LINGYUN.Abp.BackgroundTasks;
-
+///
+/// 禁用作业调度行为
+///
[AttributeUsage(AttributeTargets.Class)]
public class DisableJobActionAttribute : Attribute
{
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobStatusAttribute.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobStatusAttribute.cs
new file mode 100644
index 000000000..1d213c2bb
--- /dev/null
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobStatusAttribute.cs
@@ -0,0 +1,10 @@
+using System;
+
+namespace LINGYUN.Abp.BackgroundTasks;
+///
+/// 禁用作业调度状态
+///
+[AttributeUsage(AttributeTargets.Class)]
+public class DisableJobStatusAttribute : Attribute
+{
+}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs
index 9d408f046..95fa9d6ae 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs
@@ -7,6 +7,10 @@ namespace LINGYUN.Abp.BackgroundTasks;
public interface IJobStore
{
+ Task> GetRuningListAsync(
+ int maxResultCount,
+ CancellationToken cancellationToken = default);
+
Task> GetWaitingListAsync(
int maxResultCount,
CancellationToken cancellationToken = default);
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs
new file mode 100644
index 000000000..bfb9eb553
--- /dev/null
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs
@@ -0,0 +1,57 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using Volo.Abp.Auditing;
+using Volo.Abp.MultiTenancy;
+
+namespace LINGYUN.Abp.BackgroundTasks.Internal;
+
+[DisableAuditing]
+[DisableJobAction]
+public class BackgroundCheckingJob : IJobRunnable
+{
+ public async virtual Task ExecuteAsync(JobRunnableContext context)
+ {
+ try
+ {
+ var options = context.ServiceProvider.GetRequiredService>().Value;
+ var store = context.ServiceProvider.GetRequiredService();
+ var currentTenant = context.ServiceProvider.GetRequiredService();
+
+ context.TryGetMultiTenantId(out var tenantId);
+
+ using (currentTenant.Change(tenantId))
+ {
+ var runingTasks = await store.GetRuningListAsync(
+ options.MaxJobCheckCount, context.CancellationToken);
+
+ if (!runingTasks.Any())
+ {
+ return;
+ }
+
+ var jobScheduler = context.ServiceProvider.GetRequiredService();
+
+ foreach (var job in runingTasks)
+ {
+ // 当标记为运行中的作业不在调度器中时,改变为已停止作业
+ if (!await jobScheduler.ExistsAsync(job, context.CancellationToken))
+ {
+ job.Status = JobStatus.Stopped;
+
+ await store.StoreAsync(job);
+ }
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ context.ServiceProvider
+ .GetService>()
+ ?.LogError(ex, "check job status error.");
+ }
+ }
+}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs
index b38030783..61c00ad36 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs
@@ -27,6 +27,7 @@ internal class DefaultBackgroundWorker : BackgroundService
{
await QueuePollingJob();
await QueueCleaningJob();
+ await QueueCheckingJob();
}
private async Task QueuePollingJob()
@@ -47,6 +48,17 @@ internal class DefaultBackgroundWorker : BackgroundService
}
}
+ private async Task QueueCheckingJob()
+ {
+ if (_options.JobCheckEnabled)
+ {
+ var checkingJob = BuildCheckingJobInfo();
+ await _jobPublisher.PublishAsync(checkingJob);
+ }
+ }
+
+
+
private JobInfo BuildPollingJobInfo()
{
return new JobInfo
@@ -89,4 +101,26 @@ internal class DefaultBackgroundWorker : BackgroundService
Type = typeof(BackgroundCleaningJob).AssemblyQualifiedName,
};
}
+
+ private JobInfo BuildCheckingJobInfo()
+ {
+ return new JobInfo
+ {
+ Id = "Checking",
+ Name = nameof(BackgroundCheckingJob),
+ Group = "Checking",
+ Description = "Checking tasks to be executed",
+ Args = new Dictionary(),
+ Status = JobStatus.Running,
+ BeginTime = DateTime.Now,
+ CreationTime = DateTime.Now,
+ Cron = _options.JobCheckCronExpression,
+ LockTimeOut = _options.JobCheckLockTimeOut,
+ JobType = JobType.Period,
+ Priority = JobPriority.High,
+ Source = JobSource.System,
+ NodeName = _options.NodeName,
+ Type = typeof(BackgroundCheckingJob).AssemblyQualifiedName,
+ };
+ }
}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs
index 69d292b8c..47769a2c7 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs
@@ -11,17 +11,18 @@ namespace LINGYUN.Abp.BackgroundTasks.Internal;
internal class InMemoryJobStore : IJobStore, ISingletonDependency
{
private readonly List _memoryJobStore;
+ private readonly static object _jobSync = new();
public InMemoryJobStore()
{
_memoryJobStore = new List();
}
- public Task> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default)
+ public virtual Task> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
- var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry };
+ var status = new JobStatus[] { JobStatus.Queuing, JobStatus.FailedRetry };
var jobs = _memoryJobStore
.Where(x => !x.IsAbandoned)
@@ -33,12 +34,26 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
return Task.FromResult(jobs);
}
- public Task> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default)
+ public virtual Task> GetRuningListAsync(int maxResultCount, CancellationToken cancellationToken = default)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var status = new JobStatus[] { JobStatus.Running };
+
+ var jobs = _memoryJobStore
+ .Where(x => status.Contains(x.Status))
+ .Take(maxResultCount)
+ .ToList();
+
+ return Task.FromResult(jobs);
+ }
+
+ public virtual Task> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var now = DateTime.Now;
- var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry };
+ var status = new JobStatus[] { JobStatus.Queuing, JobStatus.FailedRetry };
var jobs = _memoryJobStore
.Where(x => !x.IsAbandoned)
@@ -63,62 +78,76 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
return Task.FromResult(job);
}
- public Task StoreAsync(
+ public virtual Task StoreAsync(
JobInfo jobInfo,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
- var job = _memoryJobStore.FirstOrDefault(x => x.Id.Equals(jobInfo.Id));
- if (job != null)
+ lock(_jobSync)
{
- job.NextRunTime = jobInfo.NextRunTime;
- job.LastRunTime = jobInfo.LastRunTime;
- job.Status = jobInfo.Status;
- job.TriggerCount = jobInfo.TriggerCount;
- job.TryCount = jobInfo.TryCount;
- job.IsAbandoned = jobInfo.IsAbandoned;
- }
- else
- {
- _memoryJobStore.Add(jobInfo);
+ var job = _memoryJobStore.FirstOrDefault(x => x.Id.Equals(jobInfo.Id));
+ if (job != null)
+ {
+ job.NextRunTime = jobInfo.NextRunTime;
+ job.LastRunTime = jobInfo.LastRunTime;
+ job.Status = jobInfo.Status;
+ job.TriggerCount = jobInfo.TriggerCount;
+ job.TryCount = jobInfo.TryCount;
+ job.IsAbandoned = jobInfo.IsAbandoned;
+ }
+ else
+ {
+ _memoryJobStore.Add(jobInfo);
+ }
}
+
return Task.CompletedTask;
}
- public async Task RemoveAsync(
+ public virtual Task RemoveAsync(
string jobId,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
- var job = await FindAsync(jobId, cancellationToken);
- if (job != null)
+ lock (_jobSync)
{
- _memoryJobStore.Remove(job);
+ var job = _memoryJobStore.FirstOrDefault(x => x.Id.Equals(jobId));
+ if (job != null)
+ {
+ _memoryJobStore.Remove(job);
+ }
}
+
+ return Task.CompletedTask;
}
- public Task StoreLogAsync(JobEventData eventData)
+ public virtual Task StoreLogAsync(JobEventData eventData)
{
eventData.CancellationToken.ThrowIfCancellationRequested();
return Task.CompletedTask;
}
- public Task> CleanupAsync(int maxResultCount, TimeSpan jobExpiratime, CancellationToken cancellationToken = default)
+ public virtual Task> CleanupAsync(int maxResultCount, TimeSpan jobExpiratime, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
- var expiratime = DateTime.Now - jobExpiratime;
+ var expriaJobs = new List();
- var expriaJobs = _memoryJobStore
- .Where(x => x.Status == JobStatus.Completed &&
- expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0)
- .Take(maxResultCount)
- .ToList();
+ lock (_jobSync)
+ {
+ var expiratime = DateTime.Now - jobExpiratime;
- _memoryJobStore.RemoveAll(expriaJobs);
+ expriaJobs = _memoryJobStore
+ .Where(x => x.Status == JobStatus.Completed &&
+ expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0)
+ .Take(maxResultCount)
+ .ToList();
+
+ _memoryJobStore.RemoveAll(expriaJobs);
+ }
return Task.FromResult(expriaJobs);
}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs
index 9f7837a02..14dd63627 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs
@@ -10,6 +10,10 @@ public class JobExecutedEvent : JobEventBase, ITransientDepend
{
protected override async Task OnJobAfterExecutedAsync(JobEventContext context)
{
+ if (context.EventData.Type.IsDefined(typeof(DisableJobStatusAttribute), true))
+ {
+ return;
+ }
var store = context.ServiceProvider.GetRequiredService();
var job = await store.FindAsync(context.EventData.Key, context.EventData.CancellationToken);
if (job != null)
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs
index b9001ad5f..df1e7ac23 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs
@@ -40,6 +40,20 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
return ObjectMapper.Map, List>(jobInfos);
}
+ public async virtual Task> GetRuningListAsync(int maxResultCount, CancellationToken cancellationToken = default)
+ {
+ var filter = new BackgroundJobInfoFilter
+ {
+ Status = JobStatus.Running
+ };
+ var specification = new BackgroundJobInfoSpecification(filter);
+
+ var jobInfos = await JobInfoRepository.GetListAsync(
+ specification, maxResultCount: maxResultCount, cancellationToken: cancellationToken);
+
+ return ObjectMapper.Map, List>(jobInfos);
+ }
+
public async virtual Task> GetWaitingListAsync(
int maxResultCount,
CancellationToken cancellationToken = default)
diff --git a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/EventBus/Handlers/TenantSynchronizer.cs b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/EventBus/Handlers/TenantSynchronizer.cs
index a59768ca4..d890c70ba 100644
--- a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/EventBus/Handlers/TenantSynchronizer.cs
+++ b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/EventBus/Handlers/TenantSynchronizer.cs
@@ -60,6 +60,10 @@ namespace LY.MicroService.TaskManagement.EventBus.Handlers
var cleaningJob = BuildCleaningJobInfo(eventData.Entity.Id, eventData.Entity.Name);
await JobScheduler.RemoveAsync(cleaningJob);
await JobStore.RemoveAsync(cleaningJob.Id);
+
+ var checkingJob = BuildCheckingJobInfo(eventData.Entity.Id, eventData.Entity.Name);
+ await JobScheduler.RemoveAsync(checkingJob);
+ await JobStore.RemoveAsync(checkingJob.Id);
}
public async Task HandleEventAsync(CreateEventData eventData)
@@ -79,6 +83,10 @@ namespace LY.MicroService.TaskManagement.EventBus.Handlers
var cleaningJob = BuildCleaningJobInfo(eventData.Id, eventData.Name);
await JobStore.StoreAsync(cleaningJob);
await JobScheduler.QueueAsync(cleaningJob);
+
+ var checkingJob = BuildCheckingJobInfo(eventData.Id, eventData.Name);
+ await JobStore.StoreAsync(checkingJob);
+ await JobScheduler.QueueAsync(checkingJob);
}
private async Task MigrateAsync(CreateEventData eventData)
@@ -145,5 +153,27 @@ namespace LY.MicroService.TaskManagement.EventBus.Handlers
Type = typeof(BackgroundCleaningJob).AssemblyQualifiedName,
};
}
+
+ private JobInfo BuildCheckingJobInfo(Guid tenantId, string tenantName)
+ {
+ return new JobInfo
+ {
+ Id = tenantId.ToString() + "_Checking",
+ Name = nameof(BackgroundCheckingJob),
+ Group = "Checking",
+ Description = "Checking tasks to be executed",
+ Args = new Dictionary() { { nameof(JobInfo.TenantId), tenantId } },
+ Status = JobStatus.Running,
+ BeginTime = DateTime.Now,
+ CreationTime = DateTime.Now,
+ Cron = Options.JobCheckCronExpression,
+ LockTimeOut = Options.JobCheckLockTimeOut,
+ JobType = JobType.Period,
+ Priority = JobPriority.High,
+ Source = JobSource.System,
+ TenantId = tenantId,
+ Type = typeof(BackgroundCheckingJob).AssemblyQualifiedName,
+ };
+ }
}
}