From 27b5c69d127baf86414bfce52f7812d90d7adf8c Mon Sep 17 00:00:00 2001 From: cKey <35512826+colinin@users.noreply.github.com> Date: Tue, 11 Jan 2022 19:40:58 +0800 Subject: [PATCH] fix(tasks): the lock task does not block the listener from running --- .../Abp/BackgroundTasks/IJobLockProvider.cs | 19 ++++++ .../Quartz/AbpBackgroundTasksQuartzModule.cs | 2 + .../Quartz/QuartzJobExecutorProvider.cs | 10 +-- .../Quartz/QuartzJobListener.cs | 22 +++++-- .../Quartz/QuartzTriggerListener.cs | 56 +++++++++++++++++ .../LINGYUN.Abp.BackgroundTasks.csproj | 1 + .../Abp/BackgroundTasks/JobLockProvider.cs | 61 +++++++++++++++++++ .../BackgroundTasks/JobRunnableExecuter.cs | 30 +-------- .../BackgroundJobInfoAppService.cs | 6 +- ...Service.TaskManagement.HttpApi.Host.csproj | 1 + .../TaskManagementHttpApiHostModule.cs | 2 + 11 files changed, 171 insertions(+), 39 deletions(-) create mode 100644 aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs create mode 100644 aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs create mode 100644 aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs new file mode 100644 index 000000000..0d5979633 --- /dev/null +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs @@ -0,0 +1,19 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace LINGYUN.Abp.BackgroundTasks; + +/// +/// 作业锁定提供者 +/// +public interface IJobLockProvider +{ + Task TryLockAsync( + string jobKey, + int lockSeconds, + CancellationToken cancellationToken = default); + + Task TryReleaseAsync( + string jobKey, + CancellationToken cancellationToken = default); +} diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/AbpBackgroundTasksQuartzModule.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/AbpBackgroundTasksQuartzModule.cs index 983823e05..788825614 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/AbpBackgroundTasksQuartzModule.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/AbpBackgroundTasksQuartzModule.cs @@ -15,5 +15,7 @@ public class AbpBackgroundTasksQuartzModule : AbpModule var _scheduler = context.ServiceProvider.GetRequiredService(); _scheduler.ListenerManager.AddJobListener(context.ServiceProvider.GetRequiredService()); + _scheduler.ListenerManager.AddTriggerListener(context.ServiceProvider.GetRequiredService()); + } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs index a1070ddb3..c20d3015d 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs @@ -36,10 +36,12 @@ public class QuartzJobExecutorProvider : IQuartzJobExecutorProvider, ISingletonD } var adapterType = typeof(QuartzJobSimpleAdapter<>); - if (job.LockTimeOut > 0) - { - adapterType = typeof(QuartzJobConcurrentAdapter<>); - } + + // 注释, 通过触发器监听锁定 + //if (job.LockTimeOut > 0) + //{ + // adapterType = typeof(QuartzJobConcurrentAdapter<>); + //} if (!typeof(IJob).IsAssignableFrom(jobType)) { diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs index b90cb54b6..6ed7e81c8 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs @@ -4,11 +4,12 @@ using Microsoft.Extensions.Logging.Abstractions; using Quartz; using Quartz.Listener; using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; using Volo.Abp.MultiTenancy; -using Volo.Abp.Uow; +using Volo.Abp.Timing; namespace LINGYUN.Abp.BackgroundTasks.Quartz; @@ -18,13 +19,16 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency public override string Name => "QuartzJobListener"; + protected IClock Clock { get; } protected IJobEventProvider EventProvider { get; } protected IServiceProvider ServiceProvider { get; } public QuartzJobListener( + IClock clock, IServiceProvider serviceProvider, IJobEventProvider eventProvider) { + Clock = clock; ServiceProvider = serviceProvider; EventProvider = eventProvider; @@ -50,6 +54,12 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency { try { + var jobEventList = EventProvider.GetAll(); + if (!jobEventList.Any()) + { + return; + } + using var scope = ServiceProvider.CreateScope(); var jobEventData = new JobEventData( jobUUId, @@ -60,7 +70,6 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency Result = context.Result?.ToString() }; - var jobEventList = EventProvider.GetAll(); var eventContext = new JobEventContext( scope.ServiceProvider, jobEventData); @@ -86,6 +95,12 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency { try { + var jobEventList = EventProvider.GetAll(); + if (!jobEventList.Any()) + { + return; + } + using var scope = ServiceProvider.CreateScope(); var jobId = context.GetString(nameof(JobInfo.Id)); if (Guid.TryParse(jobId, out var jobUUId)) @@ -112,7 +127,7 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency jobEventData.RepeatCount = simpleTrigger.RepeatCount; } jobEventData.Description = context.JobDetail.Description; - jobEventData.RunTime = context.FireTimeUtc.LocalDateTime; + jobEventData.RunTime = Clock.Now; jobEventData.LastRunTime = context.PreviousFireTimeUtc?.LocalDateTime; jobEventData.NextRunTime = context.NextFireTimeUtc?.LocalDateTime; if (context.Result != null) @@ -125,7 +140,6 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency jobEventData.TenantId = tenantId; } - var jobEventList = EventProvider.GetAll(); var eventContext = new JobEventContext( scope.ServiceProvider, jobEventData); diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs new file mode 100644 index 000000000..6b4182979 --- /dev/null +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs @@ -0,0 +1,56 @@ +using Quartz; +using Quartz.Listener; +using System.Threading; +using System.Threading.Tasks; +using Volo.Abp.DependencyInjection; + +namespace LINGYUN.Abp.BackgroundTasks.Quartz; + +public class QuartzTriggerListener : TriggerListenerSupport, ISingletonDependency +{ + protected const string LockKeyFormat = "p:abp-background-tasks,job:{0},key:{1}"; + + public override string Name => "QuartzTriggerListener"; + + protected IJobLockProvider JobLockProvider { get; } + + public QuartzTriggerListener( + IJobLockProvider jobLockProvider) + { + JobLockProvider = jobLockProvider; + } + + public override async Task VetoJobExecution( + ITrigger trigger, + IJobExecutionContext context, + CancellationToken cancellationToken = default) + { + context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId); + context.MergedJobDataMap.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime); + if (jobId != null && lockTime != null && lockTime is int time && time > 0) + { + + return !await JobLockProvider.TryLockAsync(NormalizeKey(context, jobId), time, cancellationToken); + } + + return false; + } + + public override async Task TriggerComplete( + ITrigger trigger, + IJobExecutionContext context, + SchedulerInstruction triggerInstructionCode, + CancellationToken cancellationToken = default) + { + if (context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId) && + context.MergedJobDataMap.ContainsKey(nameof(JobInfo.LockTimeOut))) + { + await JobLockProvider.TryReleaseAsync(NormalizeKey(context, jobId), cancellationToken); + } + } + + protected virtual string NormalizeKey(IJobExecutionContext context, object jobId) + { + return string.Format(LockKeyFormat, context.JobDetail.JobType.Name, jobId.ToString()); + } +} diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN.Abp.BackgroundTasks.csproj b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN.Abp.BackgroundTasks.csproj index 14206d3f2..b49f1e324 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN.Abp.BackgroundTasks.csproj +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN.Abp.BackgroundTasks.csproj @@ -11,6 +11,7 @@ + diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs new file mode 100644 index 000000000..85f286de8 --- /dev/null +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs @@ -0,0 +1,61 @@ +using Microsoft.Extensions.Caching.Memory; +using System; +using System.Threading; +using System.Threading.Tasks; +using Volo.Abp.DependencyInjection; +using Volo.Abp.DistributedLocking; + +namespace LINGYUN.Abp.BackgroundTasks; + +[Dependency(ReplaceServices = true)] +public class JobLockProvider : IJobLockProvider, ISingletonDependency +{ + protected IMemoryCache LockCache { get; } + protected IAbpDistributedLock DistributedLock { get; } + + public JobLockProvider( + IMemoryCache lockCache, + IAbpDistributedLock distributedLock) + { + LockCache = lockCache; + DistributedLock = distributedLock; + } + + public virtual async Task TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default) + { + var handle = await DistributedLock.TryAcquireAsync(jobKey, cancellationToken: cancellationToken); + if (handle != null) + { + await LockCache.GetOrCreateAsync(jobKey, (entry) => + { + entry.SetAbsoluteExpiration(TimeSpan.FromSeconds(lockSeconds)); + entry.RegisterPostEvictionCallback(async (key, value, reason, state) => + { + if (reason == EvictionReason.Expired && value is IAbpDistributedLockHandle handleValue) + { + await handleValue.DisposeAsync(); + } + }); + entry.SetValue(handle); + + return Task.FromResult(handle); + }); + + return true; + } + return false; + } + + public virtual async Task TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default) + { + if (LockCache.TryGetValue(jobKey, out var handle)) + { + await handle.DisposeAsync(); + + LockCache.Remove(jobKey); + + return true; + } + return false; + } +} diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobRunnableExecuter.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobRunnableExecuter.cs index 042f681fa..9e92ded39 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobRunnableExecuter.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobRunnableExecuter.cs @@ -1,17 +1,13 @@ using Microsoft.Extensions.DependencyInjection; using System; -using System.Collections.Generic; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; -using Volo.Abp.DistributedLocking; using Volo.Abp.MultiTenancy; namespace LINGYUN.Abp.BackgroundTasks; public class JobRunnableExecuter : IJobRunnableExecuter, ISingletonDependency { - protected const string LockKeyFormat = "p:{0},job:{1},key:{2}"; - public async virtual Task ExecuteAsync(JobRunnableContext context) { Guid? tenantId = null; @@ -24,31 +20,7 @@ public class JobRunnableExecuter : IJobRunnableExecuter, ISingletonDependency var currentTenant = context.ServiceProvider.GetRequiredService(); using (currentTenant.Change(tenantId)) { - context.JobData.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime); - - // 某些提供者如果无法保证锁一致性, 那么需要用分布式锁 - if (lockTime != null && (lockTime is int time && time > 0)) - { - var jobId = context.JobData.GetOrDefault(nameof(JobInfo.Id)); - var jobLockKey = string.Format(LockKeyFormat, tenantId?.ToString() ?? "Default", context.JobType.Name, jobId); - var distributedLock = context.ServiceProvider.GetRequiredService(); - - var handle = await distributedLock.TryAcquireAsync(jobLockKey, TimeSpan.FromSeconds(time)); - if (handle == null) - { - // 抛出异常 通过监听器使其重试 - throw new AbpBackgroundTaskConcurrentException(context.JobType); - } - - await using (handle) - { - await InternalExecuteAsync(context); - } - } - else - { - await InternalExecuteAsync(context); - } + await InternalExecuteAsync(context); } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs index 4ded0bf2a..c9ee39774 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs @@ -156,8 +156,10 @@ public class BackgroundJobInfoAppService : TaskManagementApplicationService, IBa backgroundJobInfo.MaxCount = input.MaxCount; backgroundJobInfo.MaxTryCount = input.MaxTryCount; - backgroundJobInfo.Args.RemoveAll(x => !input.Args.ContainsKey(x.Key)); - backgroundJobInfo.Args.AddIfNotContains(input.Args); + foreach (var arg in input.Args) + { + backgroundJobInfo.Args[arg.Key] = arg.Value; + } backgroundJobInfo.SetPriority(input.Priority); switch (input.JobType) diff --git a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/LY.MicroService.TaskManagement.HttpApi.Host.csproj b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/LY.MicroService.TaskManagement.HttpApi.Host.csproj index e64223cd1..3ce7471c7 100644 --- a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/LY.MicroService.TaskManagement.HttpApi.Host.csproj +++ b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/LY.MicroService.TaskManagement.HttpApi.Host.csproj @@ -38,6 +38,7 @@ + diff --git a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs index 2ea812a44..44bd38542 100644 --- a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs +++ b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs @@ -22,6 +22,7 @@ using Volo.Abp.AspNetCore.Mvc; using Volo.Abp.AspNetCore.Serilog; using Volo.Abp.Autofac; using Volo.Abp.Caching.StackExchangeRedis; +using Volo.Abp.DistributedLocking; using Volo.Abp.EntityFrameworkCore.MySQL; using Volo.Abp.FeatureManagement.EntityFrameworkCore; using Volo.Abp.Http.Client.IdentityModel.Web; @@ -38,6 +39,7 @@ namespace LY.MicroService.TaskManagement; typeof(AbpSerilogEnrichersUniqueIdModule), typeof(AbpAuditLoggingElasticsearchModule), typeof(AbpAspNetCoreSerilogModule), + typeof(AbpDistributedLockingModule), typeof(AbpEntityFrameworkCoreMySQLModule), typeof(AbpAspNetCoreAuthenticationJwtBearerModule), typeof(AbpEmailingExceptionHandlingModule),