Browse Source

refactor(background-tasks): Refactor the built-in jobs

- Handle the job status of other nodes through events
pull/1376/head
colin 4 months ago
parent
commit
a57bdb4ead
  1. 24
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/DistributedJobDispatcher.cs
  2. 12
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobCheckRuningEventData.cs
  3. 12
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobCleanupExpiredEventData.cs
  4. 15
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobSynchronizer.cs
  5. 92
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/DefaultJobStateChecker.cs
  6. 24
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobDispatcher.cs
  7. 29
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStateChecker.cs
  8. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs
  9. 48
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs
  10. 27
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCleaningJob.cs
  11. 11
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs
  12. 14
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/NullJobDispatcher.cs
  13. 5
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs
  14. 4
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoFilter.cs
  15. 1
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoSpecification.cs
  16. 12
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs
  17. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/IBackgroundJobInfoRepository.cs
  18. 3
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/EfCoreBackgroundJobInfoRepository.cs

24
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/DistributedJobDispatcher.cs

@ -52,4 +52,28 @@ public class DistributedJobDispatcher : IJobDispatcher, ITransientDependency
return true; return true;
} }
public async virtual Task CleanExpiredJobAsync(
Guid? tenantId = null,
CancellationToken cancellationToken = default)
{
var eventData = new JobCleanupExpiredEventData
{
TenantId = tenantId,
};
await EventBus.PublishAsync(eventData);
}
public async virtual Task CheckRuningJobAsync(
Guid? tenantId = null,
CancellationToken cancellationToken = default)
{
var eventData = new JobCheckRuningEventData
{
TenantId = tenantId,
};
await EventBus.PublishAsync(eventData);
}
} }

12
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobCheckRuningEventData.cs

@ -0,0 +1,12 @@
using System;
using Volo.Abp.EventBus;
using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.BackgroundTasks.EventBus;
[Serializable]
[EventName("abp.background-tasks.job.check_running")]
public class JobCheckRuningEventData : IMultiTenant
{
public Guid? TenantId { get; set; }
}

12
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobCleanupExpiredEventData.cs

@ -0,0 +1,12 @@
using System;
using Volo.Abp.EventBus;
using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.BackgroundTasks.EventBus;
[Serializable]
[EventName("abp.background-tasks.job.cleanup_expired")]
public class JobCleanupExpiredEventData : IMultiTenant
{
public Guid? TenantId { get; set; }
}

15
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobSynchronizer.cs

@ -11,19 +11,24 @@ public class JobSynchronizer :
IDistributedEventHandler<JobPauseEventData>, IDistributedEventHandler<JobPauseEventData>,
IDistributedEventHandler<JobResumeEventData>, IDistributedEventHandler<JobResumeEventData>,
IDistributedEventHandler<JobDeleteEventData>, IDistributedEventHandler<JobDeleteEventData>,
IDistributedEventHandler<JobCheckRuningEventData>,
IDistributedEventHandler<JobCleanupExpiredEventData>,
ITransientDependency ITransientDependency
{ {
protected IJobStore JobStore { get; } protected IJobStore JobStore { get; }
protected IJobScheduler JobScheduler { get; } protected IJobScheduler JobScheduler { get; }
protected IJobStateChecker JobStateChecker { get; }
protected AbpBackgroundTasksOptions BackgroundTasksOptions { get; } protected AbpBackgroundTasksOptions BackgroundTasksOptions { get; }
public JobSynchronizer( public JobSynchronizer(
IJobStore jobStore, IJobStore jobStore,
IJobScheduler jobScheduler, IJobScheduler jobScheduler,
IJobStateChecker jobStateChecker,
IOptions<AbpBackgroundTasksOptions> options) IOptions<AbpBackgroundTasksOptions> options)
{ {
JobStore = jobStore; JobStore = jobStore;
JobScheduler = jobScheduler; JobScheduler = jobScheduler;
JobStateChecker = jobStateChecker;
BackgroundTasksOptions = options.Value; BackgroundTasksOptions = options.Value;
} }
@ -134,4 +139,14 @@ public class JobSynchronizer :
} }
} }
} }
public async virtual Task HandleEventAsync(JobCleanupExpiredEventData eventData)
{
await JobStateChecker.CleanExpiredJobAsync(eventData.TenantId);
}
public async virtual Task HandleEventAsync(JobCheckRuningEventData eventData)
{
await JobStateChecker.CheckRuningJobAsync(eventData.TenantId);
}
} }

92
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/DefaultJobStateChecker.cs

@ -0,0 +1,92 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.BackgroundTasks;
public class DefaultJobStateChecker : IJobStateChecker, ITransientDependency
{
private readonly ICurrentTenant _currentTenant;
private readonly IJobStore _jobStore;
private readonly IJobScheduler _jobScheduler;
private readonly AbpBackgroundTasksOptions _options;
private readonly ILogger<DefaultJobStateChecker> _logger;
public DefaultJobStateChecker(
ICurrentTenant currentTenant,
IJobStore jobStore,
IJobScheduler jobScheduler,
IOptions<AbpBackgroundTasksOptions> options,
ILogger<DefaultJobStateChecker> logger)
{
_currentTenant = currentTenant;
_jobStore = jobStore;
_jobScheduler = jobScheduler;
_options = options.Value;
_logger = logger;
}
public async virtual Task CheckRuningJobAsync(Guid? tenantId = null, CancellationToken cancellationToken = default)
{
try
{
using (_currentTenant.Change(tenantId))
{
var runingJobs = await _jobStore.GetRuningListAsync(
_options.MaxJobCheckCount,
_options.NodeName,
cancellationToken);
if (runingJobs.Count == 0)
{
return;
}
foreach (var runingJob in runingJobs)
{
// 当标记为运行中的作业不在调度器中时,改变为已停止作业
if (!await _jobScheduler.ExistsAsync(runingJob, cancellationToken))
{
runingJob.Status = JobStatus.Stopped;
await _jobStore.StoreAsync(runingJob);
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "check job status error.");
}
}
public async virtual Task CleanExpiredJobAsync(Guid? tenantId = null, CancellationToken cancellationToken = default)
{
try
{
using (_currentTenant.Change(tenantId))
{
var expiredJobs = await _jobStore.CleanupAsync(
_options.MaxJobCleanCount,
_options.JobExpiratime,
_options.NodeName,
cancellationToken);
foreach (var expiredJob in expiredJobs)
{
// 从队列强制移除作业
await _jobScheduler.RemoveAsync(expiredJob, cancellationToken);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "cleanup expired job error.");
}
}
}

24
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobDispatcher.cs

@ -32,4 +32,28 @@ public interface IJobDispatcher
string nodeName = null, string nodeName = null,
Guid? tenantId = null, Guid? tenantId = null,
CancellationToken cancellationToken = default); CancellationToken cancellationToken = default);
/// <summary>
/// 通知清理过期作业
/// </summary>
/// <remarks>
/// 通知各节点清理当前节点中过期作业
/// </remarks>
/// <param name="tenantId">租户标识</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task CleanExpiredJobAsync(
Guid? tenantId = null,
CancellationToken cancellationToken = default);
/// <summary>
/// 检查运行中的作业
/// </summary>
/// <remarks>
/// 通知各节点检查当前节点中运行中作业
/// </remarks>
/// <param name="tenantId">租户标识</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task CheckRuningJobAsync(
Guid? tenantId = null,
CancellationToken cancellationToken = default);
} }

29
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStateChecker.cs

@ -0,0 +1,29 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace LINGYUN.Abp.BackgroundTasks;
/// <summary>
/// 作业状态检查接口
/// </summary>
public interface IJobStateChecker
{
/// <summary>
/// 清理过期作业
/// </summary>
/// <param name="tenantId">租户标识</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task CleanExpiredJobAsync(
Guid? tenantId = null,
CancellationToken cancellationToken = default);
/// <summary>
/// 检查运行中的作业
/// </summary>
/// <param name="tenantId">租户标识</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task CheckRuningJobAsync(
Guid? tenantId = null,
CancellationToken cancellationToken = default);
}

2
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs

@ -9,6 +9,7 @@ public interface IJobStore
{ {
Task<List<JobInfo>> GetRuningListAsync( Task<List<JobInfo>> GetRuningListAsync(
int maxResultCount, int maxResultCount,
string nodeName = null,
CancellationToken cancellationToken = default); CancellationToken cancellationToken = default);
Task<List<JobInfo>> GetWaitingListAsync( Task<List<JobInfo>> GetWaitingListAsync(
@ -35,5 +36,6 @@ public interface IJobStore
Task<List<JobInfo>> CleanupAsync( Task<List<JobInfo>> CleanupAsync(
int maxResultCount, int maxResultCount,
TimeSpan jobExpiratime, TimeSpan jobExpiratime,
string nodeName = null,
CancellationToken cancellationToken = default); CancellationToken cancellationToken = default);
} }

48
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs

@ -1,11 +1,5 @@
using Microsoft.Extensions.DependencyInjection; using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Linq;
using System.Threading.Tasks;
using Volo.Abp.Auditing; using Volo.Abp.Auditing;
using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.BackgroundTasks.Internal; namespace LINGYUN.Abp.BackgroundTasks.Internal;
@ -15,43 +9,13 @@ public class BackgroundCheckingJob : IJobRunnable
{ {
public async virtual Task ExecuteAsync(JobRunnableContext context) public async virtual Task ExecuteAsync(JobRunnableContext context)
{ {
try context.TryGetMultiTenantId(out var tenantId);
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundTasksOptions>>().Value;
var store = context.ServiceProvider.GetRequiredService<IJobStore>();
var currentTenant = context.ServiceProvider.GetRequiredService<ICurrentTenant>();
context.TryGetMultiTenantId(out var tenantId); var jobDispatcher = context.GetRequiredService<IJobDispatcher>();
var jobStateChecker = context.GetRequiredService<IJobStateChecker>();
using (currentTenant.Change(tenantId)) await jobStateChecker.CheckRuningJobAsync(tenantId, context.CancellationToken);
{
var runingTasks = await store.GetRuningListAsync(
options.MaxJobCheckCount, context.CancellationToken);
if (!runingTasks.Any()) await jobDispatcher.CheckRuningJobAsync(tenantId, context.CancellationToken);
{
return;
}
var jobScheduler = context.ServiceProvider.GetRequiredService<IJobScheduler>();
foreach (var job in runingTasks)
{
// 当标记为运行中的作业不在调度器中时,改变为已停止作业
if (!await jobScheduler.ExistsAsync(job, context.CancellationToken))
{
job.Status = JobStatus.Stopped;
await store.StoreAsync(job);
}
}
}
}
catch(Exception ex)
{
context.ServiceProvider
.GetService<ILogger<BackgroundCheckingJob>>()
?.LogError(ex, "check job status error.");
}
} }
} }

27
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCleaningJob.cs

@ -1,9 +1,5 @@
using Microsoft.Extensions.DependencyInjection; using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Auditing; using Volo.Abp.Auditing;
using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.BackgroundTasks.Internal; namespace LINGYUN.Abp.BackgroundTasks.Internal;
@ -13,26 +9,13 @@ public class BackgroundCleaningJob : IJobRunnable
{ {
public async virtual Task ExecuteAsync(JobRunnableContext context) public async virtual Task ExecuteAsync(JobRunnableContext context)
{ {
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundTasksOptions>>().Value;
var store = context.ServiceProvider.GetRequiredService<IJobStore>();
var currentTenant = context.ServiceProvider.GetRequiredService<ICurrentTenant>();
context.TryGetMultiTenantId(out var tenantId); context.TryGetMultiTenantId(out var tenantId);
using (currentTenant.Change(tenantId)) var jobDispatcher = context.GetRequiredService<IJobDispatcher>();
{ var jobStateChecker = context.GetRequiredService<IJobStateChecker>();
var expiredJobs = await store.CleanupAsync(
options.MaxJobCleanCount,
options.JobExpiratime,
context.CancellationToken);
var jobScheduler = context.ServiceProvider.GetRequiredService<IJobScheduler>(); await jobStateChecker.CleanExpiredJobAsync(tenantId, context.CancellationToken);
foreach (var expiredJob in expiredJobs) await jobDispatcher.CleanExpiredJobAsync(tenantId, context.CancellationToken);
{
// 从队列强制移除作业
await jobScheduler.RemoveAsync(expiredJob, context.CancellationToken);
}
}
} }
} }

11
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs

@ -34,14 +34,14 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
return Task.FromResult(jobs); return Task.FromResult(jobs);
} }
public virtual Task<List<JobInfo>> GetRuningListAsync(int maxResultCount, CancellationToken cancellationToken = default) public virtual Task<List<JobInfo>> GetRuningListAsync(int maxResultCount, string nodeName = null, CancellationToken cancellationToken = default)
{ {
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
var status = new JobStatus[] { JobStatus.Running }; var status = new JobStatus[] { JobStatus.Running };
var jobs = _memoryJobStore var jobs = _memoryJobStore
.Where(x => status.Contains(x.Status)) .Where(x => x.NodeName == nodeName && status.Contains(x.Status))
.Take(maxResultCount) .Take(maxResultCount)
.ToList(); .ToList();
@ -130,7 +130,11 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
return Task.CompletedTask; return Task.CompletedTask;
} }
public virtual Task<List<JobInfo>> CleanupAsync(int maxResultCount, TimeSpan jobExpiratime, CancellationToken cancellationToken = default) public virtual Task<List<JobInfo>> CleanupAsync(
int maxResultCount,
TimeSpan jobExpiratime,
string nodeName = null,
CancellationToken cancellationToken = default)
{ {
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
@ -141,6 +145,7 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
var expiratime = DateTime.Now - jobExpiratime; var expiratime = DateTime.Now - jobExpiratime;
expriaJobs = _memoryJobStore expriaJobs = _memoryJobStore
.WhereIf(!nodeName.IsNullOrWhiteSpace(), x => x.NodeName == nodeName)
.Where(x => x.Status == JobStatus.Completed && .Where(x => x.Status == JobStatus.Completed &&
expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0) expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0)
.Take(maxResultCount) .Take(maxResultCount)

14
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/NullJobDispatcher.cs

@ -23,4 +23,18 @@ public class NullJobDispatcher : IJobDispatcher, ISingletonDependency
{ {
return Task.FromResult(false); return Task.FromResult(false);
} }
public Task CleanExpiredJobAsync(
Guid? tenantId = null,
CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public Task CheckRuningJobAsync(
Guid? tenantId = null,
CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
} }

5
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs

@ -146,7 +146,10 @@ public class BackgroundJobInfo : AuditedAggregateRoot<string>, IMultiTenant
// TODO: 是否需要将参数挪到另一个实体? // TODO: 是否需要将参数挪到另一个实体?
// 任务参数的建议是尽量最小化, 仅存储关键信息 // 任务参数的建议是尽量最小化, 仅存储关键信息
Args = new ExtraPropertyDictionary(); Args = new ExtraPropertyDictionary();
Args.AddIfNotContains(args); if (args != null)
{
Args.AddIfNotContains(args);
}
} }
public void SetPeriodJob(string cron) public void SetPeriodJob(string cron)

4
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoFilter.cs

@ -25,6 +25,10 @@ public class BackgroundJobInfoFilter
/// </summary> /// </summary>
public string Type { get; set; } public string Type { get; set; }
/// <summary> /// <summary>
/// 节点名称
/// </summary>
public string NodeName { get; set; }
/// <summary>
/// 任务状态 /// 任务状态
/// </summary> /// </summary>
public JobStatus? Status { get; set; } public JobStatus? Status { get; set; }

1
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoSpecification.cs

@ -16,6 +16,7 @@ public class BackgroundJobInfoSpecification : Specification<BackgroundJobInfo>
Expression<Func<BackgroundJobInfo, bool>> expression = _ => true; Expression<Func<BackgroundJobInfo, bool>> expression = _ => true;
return expression return expression
.AndIf(!Filter.NodeName.IsNullOrWhiteSpace(), x => x.NodeName == Filter.NodeName)
.AndIf(!Filter.Type.IsNullOrWhiteSpace(), x => x.Type.Contains(Filter.Type)) .AndIf(!Filter.Type.IsNullOrWhiteSpace(), x => x.Type.Contains(Filter.Type))
.AndIf(!Filter.Group.IsNullOrWhiteSpace(), x => x.Group.Equals(Filter.Group)) .AndIf(!Filter.Group.IsNullOrWhiteSpace(), x => x.Group.Equals(Filter.Group))
.AndIf(!Filter.Name.IsNullOrWhiteSpace(), x => x.Name.Equals(Filter.Name)) .AndIf(!Filter.Name.IsNullOrWhiteSpace(), x => x.Name.Equals(Filter.Name))

12
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs

@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
using Volo.Abp.ObjectMapping; using Volo.Abp.ObjectMapping;
using Volo.Abp.Specifications;
using Volo.Abp.Uow; using Volo.Abp.Uow;
namespace LINGYUN.Abp.TaskManagement; namespace LINGYUN.Abp.TaskManagement;
@ -40,13 +41,10 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
return ObjectMapper.Map<List<BackgroundJobInfo>, List<JobInfo>>(jobInfos); return ObjectMapper.Map<List<BackgroundJobInfo>, List<JobInfo>>(jobInfos);
} }
public async virtual Task<List<JobInfo>> GetRuningListAsync(int maxResultCount, CancellationToken cancellationToken = default) public async virtual Task<List<JobInfo>> GetRuningListAsync(int maxResultCount, string nodeName = null, CancellationToken cancellationToken = default)
{ {
var filter = new BackgroundJobInfoFilter var specification = new ExpressionSpecification<BackgroundJobInfo>(
{ x => x.NodeName == nodeName && x.Status == JobStatus.Running);
Status = JobStatus.Running
};
var specification = new BackgroundJobInfoSpecification(filter);
var jobInfos = await JobInfoRepository.GetListAsync( var jobInfos = await JobInfoRepository.GetListAsync(
specification, maxResultCount: maxResultCount, cancellationToken: cancellationToken); specification, maxResultCount: maxResultCount, cancellationToken: cancellationToken);
@ -175,12 +173,14 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
public async virtual Task<List<JobInfo>> CleanupAsync( public async virtual Task<List<JobInfo>> CleanupAsync(
int maxResultCount, int maxResultCount,
TimeSpan jobExpiratime, TimeSpan jobExpiratime,
string nodeName = null,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
using var unitOfWork = UnitOfWorkManager.Begin(); using var unitOfWork = UnitOfWorkManager.Begin();
var jobs = await JobInfoRepository.GetExpiredJobsAsync( var jobs = await JobInfoRepository.GetExpiredJobsAsync(
maxResultCount, maxResultCount,
jobExpiratime, jobExpiratime,
nodeName,
cancellationToken); cancellationToken);
var expiredJobs = ObjectMapper.Map<List<BackgroundJobInfo>, List<JobInfo>>(jobs); var expiredJobs = ObjectMapper.Map<List<BackgroundJobInfo>, List<JobInfo>>(jobs);

2
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/IBackgroundJobInfoRepository.cs

@ -30,11 +30,13 @@ public interface IBackgroundJobInfoRepository : IRepository<BackgroundJobInfo, s
/// </summary> /// </summary>
/// <param name="maxResultCount"></param> /// <param name="maxResultCount"></param>
/// <param name="jobExpiratime"></param> /// <param name="jobExpiratime"></param>
/// <param name="nodeName"></param>
/// <param name="cancellationToken"></param> /// <param name="cancellationToken"></param>
/// <returns></returns> /// <returns></returns>
Task<List<BackgroundJobInfo>> GetExpiredJobsAsync( Task<List<BackgroundJobInfo>> GetExpiredJobsAsync(
int maxResultCount, int maxResultCount,
TimeSpan jobExpiratime, TimeSpan jobExpiratime,
string nodeName = null,
CancellationToken cancellationToken = default); CancellationToken cancellationToken = default);
/// <summary> /// <summary>
/// 获取所有周期性任务 /// 获取所有周期性任务

3
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/EfCoreBackgroundJobInfoRepository.cs

@ -51,12 +51,13 @@ public class EfCoreBackgroundJobInfoRepository :
public async virtual Task<List<BackgroundJobInfo>> GetExpiredJobsAsync( public async virtual Task<List<BackgroundJobInfo>> GetExpiredJobsAsync(
int maxResultCount, int maxResultCount,
TimeSpan jobExpiratime, TimeSpan jobExpiratime,
string nodeName = null,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
var expiratime = Clock.Now.Subtract(jobExpiratime); var expiratime = Clock.Now.Subtract(jobExpiratime);
return await (await GetDbSetAsync()) return await (await GetDbSetAsync())
.Where(x => x.Status == JobStatus.Completed && x.LastRunTime <= expiratime) .Where(x => x.NodeName == nodeName && x.Status == JobStatus.Completed && x.LastRunTime <= expiratime)
.OrderBy(x => x.CreationTime) .OrderBy(x => x.CreationTime)
.Take(maxResultCount) .Take(maxResultCount)
.ToListAsync(GetCancellationToken(cancellationToken)); .ToListAsync(GetCancellationToken(cancellationToken));

Loading…
Cancel
Save