diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs
new file mode 100644
index 000000000..0d5979633
--- /dev/null
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs
@@ -0,0 +1,19 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace LINGYUN.Abp.BackgroundTasks;
+
+///
+/// 作业锁定提供者
+///
+public interface IJobLockProvider
+{
+ Task TryLockAsync(
+ string jobKey,
+ int lockSeconds,
+ CancellationToken cancellationToken = default);
+
+ Task TryReleaseAsync(
+ string jobKey,
+ CancellationToken cancellationToken = default);
+}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/AbpBackgroundTasksQuartzModule.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/AbpBackgroundTasksQuartzModule.cs
index 983823e05..788825614 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/AbpBackgroundTasksQuartzModule.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/AbpBackgroundTasksQuartzModule.cs
@@ -15,5 +15,7 @@ public class AbpBackgroundTasksQuartzModule : AbpModule
var _scheduler = context.ServiceProvider.GetRequiredService();
_scheduler.ListenerManager.AddJobListener(context.ServiceProvider.GetRequiredService());
+ _scheduler.ListenerManager.AddTriggerListener(context.ServiceProvider.GetRequiredService());
+
}
}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs
index a1070ddb3..c20d3015d 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs
@@ -36,10 +36,12 @@ public class QuartzJobExecutorProvider : IQuartzJobExecutorProvider, ISingletonD
}
var adapterType = typeof(QuartzJobSimpleAdapter<>);
- if (job.LockTimeOut > 0)
- {
- adapterType = typeof(QuartzJobConcurrentAdapter<>);
- }
+
+ // 注释, 通过触发器监听锁定
+ //if (job.LockTimeOut > 0)
+ //{
+ // adapterType = typeof(QuartzJobConcurrentAdapter<>);
+ //}
if (!typeof(IJob).IsAssignableFrom(jobType))
{
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs
index b90cb54b6..6ed7e81c8 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs
@@ -4,11 +4,12 @@ using Microsoft.Extensions.Logging.Abstractions;
using Quartz;
using Quartz.Listener;
using System;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.MultiTenancy;
-using Volo.Abp.Uow;
+using Volo.Abp.Timing;
namespace LINGYUN.Abp.BackgroundTasks.Quartz;
@@ -18,13 +19,16 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
public override string Name => "QuartzJobListener";
+ protected IClock Clock { get; }
protected IJobEventProvider EventProvider { get; }
protected IServiceProvider ServiceProvider { get; }
public QuartzJobListener(
+ IClock clock,
IServiceProvider serviceProvider,
IJobEventProvider eventProvider)
{
+ Clock = clock;
ServiceProvider = serviceProvider;
EventProvider = eventProvider;
@@ -50,6 +54,12 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
{
try
{
+ var jobEventList = EventProvider.GetAll();
+ if (!jobEventList.Any())
+ {
+ return;
+ }
+
using var scope = ServiceProvider.CreateScope();
var jobEventData = new JobEventData(
jobUUId,
@@ -60,7 +70,6 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
Result = context.Result?.ToString()
};
- var jobEventList = EventProvider.GetAll();
var eventContext = new JobEventContext(
scope.ServiceProvider,
jobEventData);
@@ -86,6 +95,12 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
{
try
{
+ var jobEventList = EventProvider.GetAll();
+ if (!jobEventList.Any())
+ {
+ return;
+ }
+
using var scope = ServiceProvider.CreateScope();
var jobId = context.GetString(nameof(JobInfo.Id));
if (Guid.TryParse(jobId, out var jobUUId))
@@ -112,7 +127,7 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
jobEventData.RepeatCount = simpleTrigger.RepeatCount;
}
jobEventData.Description = context.JobDetail.Description;
- jobEventData.RunTime = context.FireTimeUtc.LocalDateTime;
+ jobEventData.RunTime = Clock.Now;
jobEventData.LastRunTime = context.PreviousFireTimeUtc?.LocalDateTime;
jobEventData.NextRunTime = context.NextFireTimeUtc?.LocalDateTime;
if (context.Result != null)
@@ -125,7 +140,6 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
jobEventData.TenantId = tenantId;
}
- var jobEventList = EventProvider.GetAll();
var eventContext = new JobEventContext(
scope.ServiceProvider,
jobEventData);
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs
new file mode 100644
index 000000000..6b4182979
--- /dev/null
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs
@@ -0,0 +1,56 @@
+using Quartz;
+using Quartz.Listener;
+using System.Threading;
+using System.Threading.Tasks;
+using Volo.Abp.DependencyInjection;
+
+namespace LINGYUN.Abp.BackgroundTasks.Quartz;
+
+public class QuartzTriggerListener : TriggerListenerSupport, ISingletonDependency
+{
+ protected const string LockKeyFormat = "p:abp-background-tasks,job:{0},key:{1}";
+
+ public override string Name => "QuartzTriggerListener";
+
+ protected IJobLockProvider JobLockProvider { get; }
+
+ public QuartzTriggerListener(
+ IJobLockProvider jobLockProvider)
+ {
+ JobLockProvider = jobLockProvider;
+ }
+
+ public override async Task VetoJobExecution(
+ ITrigger trigger,
+ IJobExecutionContext context,
+ CancellationToken cancellationToken = default)
+ {
+ context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId);
+ context.MergedJobDataMap.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime);
+ if (jobId != null && lockTime != null && lockTime is int time && time > 0)
+ {
+
+ return !await JobLockProvider.TryLockAsync(NormalizeKey(context, jobId), time, cancellationToken);
+ }
+
+ return false;
+ }
+
+ public override async Task TriggerComplete(
+ ITrigger trigger,
+ IJobExecutionContext context,
+ SchedulerInstruction triggerInstructionCode,
+ CancellationToken cancellationToken = default)
+ {
+ if (context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId) &&
+ context.MergedJobDataMap.ContainsKey(nameof(JobInfo.LockTimeOut)))
+ {
+ await JobLockProvider.TryReleaseAsync(NormalizeKey(context, jobId), cancellationToken);
+ }
+ }
+
+ protected virtual string NormalizeKey(IJobExecutionContext context, object jobId)
+ {
+ return string.Format(LockKeyFormat, context.JobDetail.JobType.Name, jobId.ToString());
+ }
+}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN.Abp.BackgroundTasks.csproj b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN.Abp.BackgroundTasks.csproj
index 14206d3f2..b49f1e324 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN.Abp.BackgroundTasks.csproj
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN.Abp.BackgroundTasks.csproj
@@ -11,6 +11,7 @@
+
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs
new file mode 100644
index 000000000..85f286de8
--- /dev/null
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs
@@ -0,0 +1,61 @@
+using Microsoft.Extensions.Caching.Memory;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Volo.Abp.DependencyInjection;
+using Volo.Abp.DistributedLocking;
+
+namespace LINGYUN.Abp.BackgroundTasks;
+
+[Dependency(ReplaceServices = true)]
+public class JobLockProvider : IJobLockProvider, ISingletonDependency
+{
+ protected IMemoryCache LockCache { get; }
+ protected IAbpDistributedLock DistributedLock { get; }
+
+ public JobLockProvider(
+ IMemoryCache lockCache,
+ IAbpDistributedLock distributedLock)
+ {
+ LockCache = lockCache;
+ DistributedLock = distributedLock;
+ }
+
+ public virtual async Task TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default)
+ {
+ var handle = await DistributedLock.TryAcquireAsync(jobKey, cancellationToken: cancellationToken);
+ if (handle != null)
+ {
+ await LockCache.GetOrCreateAsync(jobKey, (entry) =>
+ {
+ entry.SetAbsoluteExpiration(TimeSpan.FromSeconds(lockSeconds));
+ entry.RegisterPostEvictionCallback(async (key, value, reason, state) =>
+ {
+ if (reason == EvictionReason.Expired && value is IAbpDistributedLockHandle handleValue)
+ {
+ await handleValue.DisposeAsync();
+ }
+ });
+ entry.SetValue(handle);
+
+ return Task.FromResult(handle);
+ });
+
+ return true;
+ }
+ return false;
+ }
+
+ public virtual async Task TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default)
+ {
+ if (LockCache.TryGetValue(jobKey, out var handle))
+ {
+ await handle.DisposeAsync();
+
+ LockCache.Remove(jobKey);
+
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobRunnableExecuter.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobRunnableExecuter.cs
index 042f681fa..9e92ded39 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobRunnableExecuter.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobRunnableExecuter.cs
@@ -1,17 +1,13 @@
using Microsoft.Extensions.DependencyInjection;
using System;
-using System.Collections.Generic;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
-using Volo.Abp.DistributedLocking;
using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.BackgroundTasks;
public class JobRunnableExecuter : IJobRunnableExecuter, ISingletonDependency
{
- protected const string LockKeyFormat = "p:{0},job:{1},key:{2}";
-
public async virtual Task ExecuteAsync(JobRunnableContext context)
{
Guid? tenantId = null;
@@ -24,31 +20,7 @@ public class JobRunnableExecuter : IJobRunnableExecuter, ISingletonDependency
var currentTenant = context.ServiceProvider.GetRequiredService();
using (currentTenant.Change(tenantId))
{
- context.JobData.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime);
-
- // 某些提供者如果无法保证锁一致性, 那么需要用分布式锁
- if (lockTime != null && (lockTime is int time && time > 0))
- {
- var jobId = context.JobData.GetOrDefault(nameof(JobInfo.Id));
- var jobLockKey = string.Format(LockKeyFormat, tenantId?.ToString() ?? "Default", context.JobType.Name, jobId);
- var distributedLock = context.ServiceProvider.GetRequiredService();
-
- var handle = await distributedLock.TryAcquireAsync(jobLockKey, TimeSpan.FromSeconds(time));
- if (handle == null)
- {
- // 抛出异常 通过监听器使其重试
- throw new AbpBackgroundTaskConcurrentException(context.JobType);
- }
-
- await using (handle)
- {
- await InternalExecuteAsync(context);
- }
- }
- else
- {
- await InternalExecuteAsync(context);
- }
+ await InternalExecuteAsync(context);
}
}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs
index 4ded0bf2a..c9ee39774 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs
@@ -156,8 +156,10 @@ public class BackgroundJobInfoAppService : TaskManagementApplicationService, IBa
backgroundJobInfo.MaxCount = input.MaxCount;
backgroundJobInfo.MaxTryCount = input.MaxTryCount;
- backgroundJobInfo.Args.RemoveAll(x => !input.Args.ContainsKey(x.Key));
- backgroundJobInfo.Args.AddIfNotContains(input.Args);
+ foreach (var arg in input.Args)
+ {
+ backgroundJobInfo.Args[arg.Key] = arg.Value;
+ }
backgroundJobInfo.SetPriority(input.Priority);
switch (input.JobType)
diff --git a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/LY.MicroService.TaskManagement.HttpApi.Host.csproj b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/LY.MicroService.TaskManagement.HttpApi.Host.csproj
index e64223cd1..3ce7471c7 100644
--- a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/LY.MicroService.TaskManagement.HttpApi.Host.csproj
+++ b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/LY.MicroService.TaskManagement.HttpApi.Host.csproj
@@ -38,6 +38,7 @@
+
diff --git a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs
index 2ea812a44..44bd38542 100644
--- a/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs
+++ b/aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs
@@ -22,6 +22,7 @@ using Volo.Abp.AspNetCore.Mvc;
using Volo.Abp.AspNetCore.Serilog;
using Volo.Abp.Autofac;
using Volo.Abp.Caching.StackExchangeRedis;
+using Volo.Abp.DistributedLocking;
using Volo.Abp.EntityFrameworkCore.MySQL;
using Volo.Abp.FeatureManagement.EntityFrameworkCore;
using Volo.Abp.Http.Client.IdentityModel.Web;
@@ -38,6 +39,7 @@ namespace LY.MicroService.TaskManagement;
typeof(AbpSerilogEnrichersUniqueIdModule),
typeof(AbpAuditLoggingElasticsearchModule),
typeof(AbpAspNetCoreSerilogModule),
+ typeof(AbpDistributedLockingModule),
typeof(AbpEntityFrameworkCoreMySQLModule),
typeof(AbpAspNetCoreAuthenticationJwtBearerModule),
typeof(AbpEmailingExceptionHandlingModule),