Browse Source

feat(tasks): add job BackgroundCheckingJob.

pull/804/head
cKey 3 years ago
parent
commit
2a141e40d4
  1. 31
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs
  2. 4
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobActionAttribute.cs
  3. 10
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobStatusAttribute.cs
  4. 4
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs
  5. 57
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs
  6. 34
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs
  7. 89
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs
  8. 4
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs
  9. 14
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs
  10. 30
      aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/EventBus/Handlers/TenantSynchronizer.cs

31
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs

@ -85,6 +85,32 @@ public class AbpBackgroundTasksOptions
/// 轮询任务也属于一个后台任务, 需要对每一次轮询加锁,防止重复任务入库
/// </remarks>
public int JobFetchLockTimeOut { get; set; }
/// <summary>
/// 启用检查任务
/// </summary>
/// <remarks>
/// 主节点启用
/// </remarks>
public bool JobCheckEnabled { get; set; }
/// <summary>
/// 每次检查任务批次大小
/// 默认: 100
/// </summary>
public int MaxJobCheckCount { get; set; }
/// <summary>
/// 检查任务批次Cron表达式
/// 默认: 每2小时执行一次(0 0 0/2 * * ? )
/// </summary>
/// <remarks>
/// Cron表达式
/// </remarks>
public string JobCheckCronExpression { get; set; }
/// <summary>
/// 检查任务批次时锁定任务超时时长(秒)
/// 默认:120
/// </summary>
public int JobCheckLockTimeOut { get; set; }
/// <summary>
/// 指定运行节点
/// </summary>
@ -96,6 +122,11 @@ public class AbpBackgroundTasksOptions
JobFetchLockTimeOut = 120;
JobFetchCronExpression = "0/30 * * * * ? ";
JobCheckEnabled = false;
MaxJobCheckCount = 100;
JobCheckLockTimeOut = 120;
JobCheckCronExpression = "0 0 0/2 * * ? ";
JobCleanEnabled = false;
MaxJobCleanCount = 1000;
JobExpiratime = TimeSpan.FromDays(15d);

4
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobActionAttribute.cs

@ -1,7 +1,9 @@
using System;
namespace LINGYUN.Abp.BackgroundTasks;
/// <summary>
/// 禁用作业调度行为
/// </summary>
[AttributeUsage(AttributeTargets.Class)]
public class DisableJobActionAttribute : Attribute
{

10
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DisableJobStatusAttribute.cs

@ -0,0 +1,10 @@
using System;
namespace LINGYUN.Abp.BackgroundTasks;
/// <summary>
/// 禁用作业调度状态
/// </summary>
[AttributeUsage(AttributeTargets.Class)]
public class DisableJobStatusAttribute : Attribute
{
}

4
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs

@ -7,6 +7,10 @@ namespace LINGYUN.Abp.BackgroundTasks;
public interface IJobStore
{
Task<List<JobInfo>> GetRuningListAsync(
int maxResultCount,
CancellationToken cancellationToken = default);
Task<List<JobInfo>> GetWaitingListAsync(
int maxResultCount,
CancellationToken cancellationToken = default);

57
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs

@ -0,0 +1,57 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Linq;
using System.Threading.Tasks;
using Volo.Abp.Auditing;
using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.BackgroundTasks.Internal;
[DisableAuditing]
[DisableJobAction]
public class BackgroundCheckingJob : IJobRunnable
{
public async virtual Task ExecuteAsync(JobRunnableContext context)
{
try
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundTasksOptions>>().Value;
var store = context.ServiceProvider.GetRequiredService<IJobStore>();
var currentTenant = context.ServiceProvider.GetRequiredService<ICurrentTenant>();
context.TryGetMultiTenantId(out var tenantId);
using (currentTenant.Change(tenantId))
{
var runingTasks = await store.GetRuningListAsync(
options.MaxJobCheckCount, context.CancellationToken);
if (!runingTasks.Any())
{
return;
}
var jobScheduler = context.ServiceProvider.GetRequiredService<IJobScheduler>();
foreach (var job in runingTasks)
{
// 当标记为运行中的作业不在调度器中时,改变为已停止作业
if (!await jobScheduler.ExistsAsync(job, context.CancellationToken))
{
job.Status = JobStatus.Stopped;
await store.StoreAsync(job);
}
}
}
}
catch(Exception ex)
{
context.ServiceProvider
.GetService<ILogger<BackgroundCheckingJob>>()
?.LogError(ex, "check job status error.");
}
}
}

34
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs

@ -27,6 +27,7 @@ internal class DefaultBackgroundWorker : BackgroundService
{
await QueuePollingJob();
await QueueCleaningJob();
await QueueCheckingJob();
}
private async Task QueuePollingJob()
@ -47,6 +48,17 @@ internal class DefaultBackgroundWorker : BackgroundService
}
}
private async Task QueueCheckingJob()
{
if (_options.JobCheckEnabled)
{
var checkingJob = BuildCheckingJobInfo();
await _jobPublisher.PublishAsync(checkingJob);
}
}
private JobInfo BuildPollingJobInfo()
{
return new JobInfo
@ -89,4 +101,26 @@ internal class DefaultBackgroundWorker : BackgroundService
Type = typeof(BackgroundCleaningJob).AssemblyQualifiedName,
};
}
private JobInfo BuildCheckingJobInfo()
{
return new JobInfo
{
Id = "Checking",
Name = nameof(BackgroundCheckingJob),
Group = "Checking",
Description = "Checking tasks to be executed",
Args = new Dictionary<string, object>(),
Status = JobStatus.Running,
BeginTime = DateTime.Now,
CreationTime = DateTime.Now,
Cron = _options.JobCheckCronExpression,
LockTimeOut = _options.JobCheckLockTimeOut,
JobType = JobType.Period,
Priority = JobPriority.High,
Source = JobSource.System,
NodeName = _options.NodeName,
Type = typeof(BackgroundCheckingJob).AssemblyQualifiedName,
};
}
}

89
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs

@ -11,17 +11,18 @@ namespace LINGYUN.Abp.BackgroundTasks.Internal;
internal class InMemoryJobStore : IJobStore, ISingletonDependency
{
private readonly List<JobInfo> _memoryJobStore;
private readonly static object _jobSync = new();
public InMemoryJobStore()
{
_memoryJobStore = new List<JobInfo>();
}
public Task<List<JobInfo>> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default)
public virtual Task<List<JobInfo>> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry };
var status = new JobStatus[] { JobStatus.Queuing, JobStatus.FailedRetry };
var jobs = _memoryJobStore
.Where(x => !x.IsAbandoned)
@ -33,12 +34,26 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
return Task.FromResult(jobs);
}
public Task<List<JobInfo>> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default)
public virtual Task<List<JobInfo>> GetRuningListAsync(int maxResultCount, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var status = new JobStatus[] { JobStatus.Running };
var jobs = _memoryJobStore
.Where(x => status.Contains(x.Status))
.Take(maxResultCount)
.ToList();
return Task.FromResult(jobs);
}
public virtual Task<List<JobInfo>> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var now = DateTime.Now;
var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry };
var status = new JobStatus[] { JobStatus.Queuing, JobStatus.FailedRetry };
var jobs = _memoryJobStore
.Where(x => !x.IsAbandoned)
@ -63,62 +78,76 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
return Task.FromResult(job);
}
public Task StoreAsync(
public virtual Task StoreAsync(
JobInfo jobInfo,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var job = _memoryJobStore.FirstOrDefault(x => x.Id.Equals(jobInfo.Id));
if (job != null)
lock(_jobSync)
{
job.NextRunTime = jobInfo.NextRunTime;
job.LastRunTime = jobInfo.LastRunTime;
job.Status = jobInfo.Status;
job.TriggerCount = jobInfo.TriggerCount;
job.TryCount = jobInfo.TryCount;
job.IsAbandoned = jobInfo.IsAbandoned;
}
else
{
_memoryJobStore.Add(jobInfo);
var job = _memoryJobStore.FirstOrDefault(x => x.Id.Equals(jobInfo.Id));
if (job != null)
{
job.NextRunTime = jobInfo.NextRunTime;
job.LastRunTime = jobInfo.LastRunTime;
job.Status = jobInfo.Status;
job.TriggerCount = jobInfo.TriggerCount;
job.TryCount = jobInfo.TryCount;
job.IsAbandoned = jobInfo.IsAbandoned;
}
else
{
_memoryJobStore.Add(jobInfo);
}
}
return Task.CompletedTask;
}
public async Task RemoveAsync(
public virtual Task RemoveAsync(
string jobId,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var job = await FindAsync(jobId, cancellationToken);
if (job != null)
lock (_jobSync)
{
_memoryJobStore.Remove(job);
var job = _memoryJobStore.FirstOrDefault(x => x.Id.Equals(jobId));
if (job != null)
{
_memoryJobStore.Remove(job);
}
}
return Task.CompletedTask;
}
public Task StoreLogAsync(JobEventData eventData)
public virtual Task StoreLogAsync(JobEventData eventData)
{
eventData.CancellationToken.ThrowIfCancellationRequested();
return Task.CompletedTask;
}
public Task<List<JobInfo>> CleanupAsync(int maxResultCount, TimeSpan jobExpiratime, CancellationToken cancellationToken = default)
public virtual Task<List<JobInfo>> CleanupAsync(int maxResultCount, TimeSpan jobExpiratime, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var expiratime = DateTime.Now - jobExpiratime;
var expriaJobs = new List<JobInfo>();
var expriaJobs = _memoryJobStore
.Where(x => x.Status == JobStatus.Completed &&
expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0)
.Take(maxResultCount)
.ToList();
lock (_jobSync)
{
var expiratime = DateTime.Now - jobExpiratime;
_memoryJobStore.RemoveAll(expriaJobs);
expriaJobs = _memoryJobStore
.Where(x => x.Status == JobStatus.Completed &&
expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0)
.Take(maxResultCount)
.ToList();
_memoryJobStore.RemoveAll(expriaJobs);
}
return Task.FromResult(expriaJobs);
}

4
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs

@ -10,6 +10,10 @@ public class JobExecutedEvent : JobEventBase<JobExecutedEvent>, ITransientDepend
{
protected override async Task OnJobAfterExecutedAsync(JobEventContext context)
{
if (context.EventData.Type.IsDefined(typeof(DisableJobStatusAttribute), true))
{
return;
}
var store = context.ServiceProvider.GetRequiredService<IJobStore>();
var job = await store.FindAsync(context.EventData.Key, context.EventData.CancellationToken);
if (job != null)

14
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs

@ -40,6 +40,20 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
return ObjectMapper.Map<List<BackgroundJobInfo>, List<JobInfo>>(jobInfos);
}
public async virtual Task<List<JobInfo>> GetRuningListAsync(int maxResultCount, CancellationToken cancellationToken = default)
{
var filter = new BackgroundJobInfoFilter
{
Status = JobStatus.Running
};
var specification = new BackgroundJobInfoSpecification(filter);
var jobInfos = await JobInfoRepository.GetListAsync(
specification, maxResultCount: maxResultCount, cancellationToken: cancellationToken);
return ObjectMapper.Map<List<BackgroundJobInfo>, List<JobInfo>>(jobInfos);
}
public async virtual Task<List<JobInfo>> GetWaitingListAsync(
int maxResultCount,
CancellationToken cancellationToken = default)

30
aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/EventBus/Handlers/TenantSynchronizer.cs

@ -60,6 +60,10 @@ namespace LY.MicroService.TaskManagement.EventBus.Handlers
var cleaningJob = BuildCleaningJobInfo(eventData.Entity.Id, eventData.Entity.Name);
await JobScheduler.RemoveAsync(cleaningJob);
await JobStore.RemoveAsync(cleaningJob.Id);
var checkingJob = BuildCheckingJobInfo(eventData.Entity.Id, eventData.Entity.Name);
await JobScheduler.RemoveAsync(checkingJob);
await JobStore.RemoveAsync(checkingJob.Id);
}
public async Task HandleEventAsync(CreateEventData eventData)
@ -79,6 +83,10 @@ namespace LY.MicroService.TaskManagement.EventBus.Handlers
var cleaningJob = BuildCleaningJobInfo(eventData.Id, eventData.Name);
await JobStore.StoreAsync(cleaningJob);
await JobScheduler.QueueAsync(cleaningJob);
var checkingJob = BuildCheckingJobInfo(eventData.Id, eventData.Name);
await JobStore.StoreAsync(checkingJob);
await JobScheduler.QueueAsync(checkingJob);
}
private async Task MigrateAsync(CreateEventData eventData)
@ -145,5 +153,27 @@ namespace LY.MicroService.TaskManagement.EventBus.Handlers
Type = typeof(BackgroundCleaningJob).AssemblyQualifiedName,
};
}
private JobInfo BuildCheckingJobInfo(Guid tenantId, string tenantName)
{
return new JobInfo
{
Id = tenantId.ToString() + "_Checking",
Name = nameof(BackgroundCheckingJob),
Group = "Checking",
Description = "Checking tasks to be executed",
Args = new Dictionary<string, object>() { { nameof(JobInfo.TenantId), tenantId } },
Status = JobStatus.Running,
BeginTime = DateTime.Now,
CreationTime = DateTime.Now,
Cron = Options.JobCheckCronExpression,
LockTimeOut = Options.JobCheckLockTimeOut,
JobType = JobType.Period,
Priority = JobPriority.High,
Source = JobSource.System,
TenantId = tenantId,
Type = typeof(BackgroundCheckingJob).AssemblyQualifiedName,
};
}
}
}

Loading…
Cancel
Save