diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/DistributedJobDispatcher.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/DistributedJobDispatcher.cs index 59c04a238..6fb12bc86 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/DistributedJobDispatcher.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/DistributedJobDispatcher.cs @@ -52,4 +52,28 @@ public class DistributedJobDispatcher : IJobDispatcher, ITransientDependency return true; } + + public async virtual Task CleanExpiredJobAsync( + Guid? tenantId = null, + CancellationToken cancellationToken = default) + { + var eventData = new JobCleanupExpiredEventData + { + TenantId = tenantId, + }; + + await EventBus.PublishAsync(eventData); + } + + public async virtual Task CheckRuningJobAsync( + Guid? tenantId = null, + CancellationToken cancellationToken = default) + { + var eventData = new JobCheckRuningEventData + { + TenantId = tenantId, + }; + + await EventBus.PublishAsync(eventData); + } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobCheckRuningEventData.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobCheckRuningEventData.cs new file mode 100644 index 000000000..f00d8d612 --- /dev/null +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobCheckRuningEventData.cs @@ -0,0 +1,12 @@ +using System; +using Volo.Abp.EventBus; +using Volo.Abp.MultiTenancy; + +namespace LINGYUN.Abp.BackgroundTasks.EventBus; + +[Serializable] +[EventName("abp.background-tasks.job.check_running")] +public class JobCheckRuningEventData : IMultiTenant +{ + public Guid? TenantId { get; set; } +} diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobCleanupExpiredEventData.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobCleanupExpiredEventData.cs new file mode 100644 index 000000000..958bd88ff --- /dev/null +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobCleanupExpiredEventData.cs @@ -0,0 +1,12 @@ +using System; +using Volo.Abp.EventBus; +using Volo.Abp.MultiTenancy; + +namespace LINGYUN.Abp.BackgroundTasks.EventBus; + +[Serializable] +[EventName("abp.background-tasks.job.cleanup_expired")] +public class JobCleanupExpiredEventData : IMultiTenant +{ + public Guid? TenantId { get; set; } +} diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobSynchronizer.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobSynchronizer.cs index f9007944d..aca183095 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobSynchronizer.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobSynchronizer.cs @@ -11,19 +11,24 @@ public class JobSynchronizer : IDistributedEventHandler, IDistributedEventHandler, IDistributedEventHandler, + IDistributedEventHandler, + IDistributedEventHandler, ITransientDependency { protected IJobStore JobStore { get; } protected IJobScheduler JobScheduler { get; } + protected IJobStateChecker JobStateChecker { get; } protected AbpBackgroundTasksOptions BackgroundTasksOptions { get; } public JobSynchronizer( IJobStore jobStore, IJobScheduler jobScheduler, + IJobStateChecker jobStateChecker, IOptions options) { JobStore = jobStore; JobScheduler = jobScheduler; + JobStateChecker = jobStateChecker; BackgroundTasksOptions = options.Value; } @@ -134,4 +139,14 @@ public class JobSynchronizer : } } } + + public async virtual Task HandleEventAsync(JobCleanupExpiredEventData eventData) + { + await JobStateChecker.CleanExpiredJobAsync(eventData.TenantId); + } + + public async virtual Task HandleEventAsync(JobCheckRuningEventData eventData) + { + await JobStateChecker.CheckRuningJobAsync(eventData.TenantId); + } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/DefaultJobStateChecker.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/DefaultJobStateChecker.cs new file mode 100644 index 000000000..7fba55221 --- /dev/null +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/DefaultJobStateChecker.cs @@ -0,0 +1,92 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Threading; +using System.Threading.Tasks; +using Volo.Abp.DependencyInjection; +using Volo.Abp.MultiTenancy; + +namespace LINGYUN.Abp.BackgroundTasks; + +public class DefaultJobStateChecker : IJobStateChecker, ITransientDependency +{ + private readonly ICurrentTenant _currentTenant; + + private readonly IJobStore _jobStore; + private readonly IJobScheduler _jobScheduler; + private readonly AbpBackgroundTasksOptions _options; + private readonly ILogger _logger; + + public DefaultJobStateChecker( + ICurrentTenant currentTenant, + IJobStore jobStore, + IJobScheduler jobScheduler, + IOptions options, + ILogger logger) + { + _currentTenant = currentTenant; + _jobStore = jobStore; + _jobScheduler = jobScheduler; + _options = options.Value; + _logger = logger; + } + + public async virtual Task CheckRuningJobAsync(Guid? tenantId = null, CancellationToken cancellationToken = default) + { + try + { + using (_currentTenant.Change(tenantId)) + { + var runingJobs = await _jobStore.GetRuningListAsync( + _options.MaxJobCheckCount, + _options.NodeName, + cancellationToken); + + if (runingJobs.Count == 0) + { + return; + } + + foreach (var runingJob in runingJobs) + { + // 当标记为运行中的作业不在调度器中时,改变为已停止作业 + if (!await _jobScheduler.ExistsAsync(runingJob, cancellationToken)) + { + runingJob.Status = JobStatus.Stopped; + + await _jobStore.StoreAsync(runingJob); + } + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "check job status error."); + } + } + + public async virtual Task CleanExpiredJobAsync(Guid? tenantId = null, CancellationToken cancellationToken = default) + { + try + { + using (_currentTenant.Change(tenantId)) + { + var expiredJobs = await _jobStore.CleanupAsync( + _options.MaxJobCleanCount, + _options.JobExpiratime, + _options.NodeName, + cancellationToken); + + foreach (var expiredJob in expiredJobs) + { + // 从队列强制移除作业 + await _jobScheduler.RemoveAsync(expiredJob, cancellationToken); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "cleanup expired job error."); + } + } +} diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobDispatcher.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobDispatcher.cs index 5802ec247..86525f478 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobDispatcher.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobDispatcher.cs @@ -32,4 +32,28 @@ public interface IJobDispatcher string nodeName = null, Guid? tenantId = null, CancellationToken cancellationToken = default); + /// + /// 通知清理过期作业 + /// + /// + /// 通知各节点清理当前节点中过期作业 + /// + /// 租户标识 + /// + /// + Task CleanExpiredJobAsync( + Guid? tenantId = null, + CancellationToken cancellationToken = default); + /// + /// 检查运行中的作业 + /// + /// + /// 通知各节点检查当前节点中运行中作业 + /// + /// 租户标识 + /// + /// + Task CheckRuningJobAsync( + Guid? tenantId = null, + CancellationToken cancellationToken = default); } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStateChecker.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStateChecker.cs new file mode 100644 index 000000000..a60aeaf63 --- /dev/null +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStateChecker.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace LINGYUN.Abp.BackgroundTasks; +/// +/// 作业状态检查接口 +/// +public interface IJobStateChecker +{ + /// + /// 清理过期作业 + /// + /// 租户标识 + /// + /// + Task CleanExpiredJobAsync( + Guid? tenantId = null, + CancellationToken cancellationToken = default); + /// + /// 检查运行中的作业 + /// + /// 租户标识 + /// + /// + Task CheckRuningJobAsync( + Guid? tenantId = null, + CancellationToken cancellationToken = default); +} diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs index 95fa9d6ae..f4dd712dd 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/IJobStore.cs @@ -9,6 +9,7 @@ public interface IJobStore { Task> GetRuningListAsync( int maxResultCount, + string nodeName = null, CancellationToken cancellationToken = default); Task> GetWaitingListAsync( @@ -35,5 +36,6 @@ public interface IJobStore Task> CleanupAsync( int maxResultCount, TimeSpan jobExpiratime, + string nodeName = null, CancellationToken cancellationToken = default); } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs index bfb9eb553..ba4a98bba 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCheckingJob.cs @@ -1,11 +1,5 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Linq; -using System.Threading.Tasks; +using System.Threading.Tasks; using Volo.Abp.Auditing; -using Volo.Abp.MultiTenancy; namespace LINGYUN.Abp.BackgroundTasks.Internal; @@ -15,43 +9,13 @@ public class BackgroundCheckingJob : IJobRunnable { public async virtual Task ExecuteAsync(JobRunnableContext context) { - try - { - var options = context.ServiceProvider.GetRequiredService>().Value; - var store = context.ServiceProvider.GetRequiredService(); - var currentTenant = context.ServiceProvider.GetRequiredService(); + context.TryGetMultiTenantId(out var tenantId); - context.TryGetMultiTenantId(out var tenantId); + var jobDispatcher = context.GetRequiredService(); + var jobStateChecker = context.GetRequiredService(); - using (currentTenant.Change(tenantId)) - { - var runingTasks = await store.GetRuningListAsync( - options.MaxJobCheckCount, context.CancellationToken); + await jobStateChecker.CheckRuningJobAsync(tenantId, context.CancellationToken); - if (!runingTasks.Any()) - { - return; - } - - var jobScheduler = context.ServiceProvider.GetRequiredService(); - - foreach (var job in runingTasks) - { - // 当标记为运行中的作业不在调度器中时,改变为已停止作业 - if (!await jobScheduler.ExistsAsync(job, context.CancellationToken)) - { - job.Status = JobStatus.Stopped; - - await store.StoreAsync(job); - } - } - } - } - catch(Exception ex) - { - context.ServiceProvider - .GetService>() - ?.LogError(ex, "check job status error."); - } + await jobDispatcher.CheckRuningJobAsync(tenantId, context.CancellationToken); } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCleaningJob.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCleaningJob.cs index 6300efe4d..1db4f1a80 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCleaningJob.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/BackgroundCleaningJob.cs @@ -1,9 +1,5 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using System.Threading; -using System.Threading.Tasks; +using System.Threading.Tasks; using Volo.Abp.Auditing; -using Volo.Abp.MultiTenancy; namespace LINGYUN.Abp.BackgroundTasks.Internal; @@ -13,26 +9,13 @@ public class BackgroundCleaningJob : IJobRunnable { public async virtual Task ExecuteAsync(JobRunnableContext context) { - var options = context.ServiceProvider.GetRequiredService>().Value; - var store = context.ServiceProvider.GetRequiredService(); - var currentTenant = context.ServiceProvider.GetRequiredService(); - context.TryGetMultiTenantId(out var tenantId); - using (currentTenant.Change(tenantId)) - { - var expiredJobs = await store.CleanupAsync( - options.MaxJobCleanCount, - options.JobExpiratime, - context.CancellationToken); + var jobDispatcher = context.GetRequiredService(); + var jobStateChecker = context.GetRequiredService(); - var jobScheduler = context.ServiceProvider.GetRequiredService(); + await jobStateChecker.CleanExpiredJobAsync(tenantId, context.CancellationToken); - foreach (var expiredJob in expiredJobs) - { - // 从队列强制移除作业 - await jobScheduler.RemoveAsync(expiredJob, context.CancellationToken); - } - } + await jobDispatcher.CleanExpiredJobAsync(tenantId, context.CancellationToken); } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs index 47769a2c7..4725c79b2 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs @@ -34,14 +34,14 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency return Task.FromResult(jobs); } - public virtual Task> GetRuningListAsync(int maxResultCount, CancellationToken cancellationToken = default) + public virtual Task> GetRuningListAsync(int maxResultCount, string nodeName = null, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); var status = new JobStatus[] { JobStatus.Running }; var jobs = _memoryJobStore - .Where(x => status.Contains(x.Status)) + .Where(x => x.NodeName == nodeName && status.Contains(x.Status)) .Take(maxResultCount) .ToList(); @@ -130,7 +130,11 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency return Task.CompletedTask; } - public virtual Task> CleanupAsync(int maxResultCount, TimeSpan jobExpiratime, CancellationToken cancellationToken = default) + public virtual Task> CleanupAsync( + int maxResultCount, + TimeSpan jobExpiratime, + string nodeName = null, + CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); @@ -141,6 +145,7 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency var expiratime = DateTime.Now - jobExpiratime; expriaJobs = _memoryJobStore + .WhereIf(!nodeName.IsNullOrWhiteSpace(), x => x.NodeName == nodeName) .Where(x => x.Status == JobStatus.Completed && expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0) .Take(maxResultCount) diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/NullJobDispatcher.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/NullJobDispatcher.cs index 31edfb4a8..4dd9b041d 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/NullJobDispatcher.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/NullJobDispatcher.cs @@ -23,4 +23,18 @@ public class NullJobDispatcher : IJobDispatcher, ISingletonDependency { return Task.FromResult(false); } + + public Task CleanExpiredJobAsync( + Guid? tenantId = null, + CancellationToken cancellationToken = default) + { + return Task.CompletedTask; + } + + public Task CheckRuningJobAsync( + Guid? tenantId = null, + CancellationToken cancellationToken = default) + { + return Task.CompletedTask; + } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs index 871720cc0..ae02d8b4e 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs @@ -146,7 +146,10 @@ public class BackgroundJobInfo : AuditedAggregateRoot, IMultiTenant // TODO: 是否需要将参数挪到另一个实体? // 任务参数的建议是尽量最小化, 仅存储关键信息 Args = new ExtraPropertyDictionary(); - Args.AddIfNotContains(args); + if (args != null) + { + Args.AddIfNotContains(args); + } } public void SetPeriodJob(string cron) diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoFilter.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoFilter.cs index bece73b02..91453f508 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoFilter.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoFilter.cs @@ -25,6 +25,10 @@ public class BackgroundJobInfoFilter /// public string Type { get; set; } /// + /// 节点名称 + /// + public string NodeName { get; set; } + /// /// 任务状态 /// public JobStatus? Status { get; set; } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoSpecification.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoSpecification.cs index e1028c5d0..3d2c5e115 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoSpecification.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfoSpecification.cs @@ -16,6 +16,7 @@ public class BackgroundJobInfoSpecification : Specification Expression> expression = _ => true; return expression + .AndIf(!Filter.NodeName.IsNullOrWhiteSpace(), x => x.NodeName == Filter.NodeName) .AndIf(!Filter.Type.IsNullOrWhiteSpace(), x => x.Type.Contains(Filter.Type)) .AndIf(!Filter.Group.IsNullOrWhiteSpace(), x => x.Group.Equals(Filter.Group)) .AndIf(!Filter.Name.IsNullOrWhiteSpace(), x => x.Name.Equals(Filter.Name)) diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs index df1e7ac23..21fbd131d 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Volo.Abp.DependencyInjection; using Volo.Abp.MultiTenancy; using Volo.Abp.ObjectMapping; +using Volo.Abp.Specifications; using Volo.Abp.Uow; namespace LINGYUN.Abp.TaskManagement; @@ -40,13 +41,10 @@ public class BackgroundJobStore : IJobStore, ITransientDependency return ObjectMapper.Map, List>(jobInfos); } - public async virtual Task> GetRuningListAsync(int maxResultCount, CancellationToken cancellationToken = default) + public async virtual Task> GetRuningListAsync(int maxResultCount, string nodeName = null, CancellationToken cancellationToken = default) { - var filter = new BackgroundJobInfoFilter - { - Status = JobStatus.Running - }; - var specification = new BackgroundJobInfoSpecification(filter); + var specification = new ExpressionSpecification( + x => x.NodeName == nodeName && x.Status == JobStatus.Running); var jobInfos = await JobInfoRepository.GetListAsync( specification, maxResultCount: maxResultCount, cancellationToken: cancellationToken); @@ -175,12 +173,14 @@ public class BackgroundJobStore : IJobStore, ITransientDependency public async virtual Task> CleanupAsync( int maxResultCount, TimeSpan jobExpiratime, + string nodeName = null, CancellationToken cancellationToken = default) { using var unitOfWork = UnitOfWorkManager.Begin(); var jobs = await JobInfoRepository.GetExpiredJobsAsync( maxResultCount, jobExpiratime, + nodeName, cancellationToken); var expiredJobs = ObjectMapper.Map, List>(jobs); diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/IBackgroundJobInfoRepository.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/IBackgroundJobInfoRepository.cs index 40bd4bba9..0aa573e86 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/IBackgroundJobInfoRepository.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/IBackgroundJobInfoRepository.cs @@ -30,11 +30,13 @@ public interface IBackgroundJobInfoRepository : IRepository /// /// + /// /// /// Task> GetExpiredJobsAsync( int maxResultCount, TimeSpan jobExpiratime, + string nodeName = null, CancellationToken cancellationToken = default); /// /// 获取所有周期性任务 diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/EfCoreBackgroundJobInfoRepository.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/EfCoreBackgroundJobInfoRepository.cs index 672ff068e..7e1adcdac 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/EfCoreBackgroundJobInfoRepository.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/EfCoreBackgroundJobInfoRepository.cs @@ -51,12 +51,13 @@ public class EfCoreBackgroundJobInfoRepository : public async virtual Task> GetExpiredJobsAsync( int maxResultCount, TimeSpan jobExpiratime, + string nodeName = null, CancellationToken cancellationToken = default) { var expiratime = Clock.Now.Subtract(jobExpiratime); return await (await GetDbSetAsync()) - .Where(x => x.Status == JobStatus.Completed && x.LastRunTime <= expiratime) + .Where(x => x.NodeName == nodeName && x.Status == JobStatus.Completed && x.LastRunTime <= expiratime) .OrderBy(x => x.CreationTime) .Take(maxResultCount) .ToListAsync(GetCancellationToken(cancellationToken));