Browse Source
Merge pull request #539 from colinin/fix-concurrent-job-lock
fix(tasks): fixed concurrent job lock failure.
pull/580/head
yx lin
4 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with
13 additions and
17 deletions
-
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs
-
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs
-
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs
-
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs
-
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs
|
|
|
@ -12,7 +12,7 @@ public class DefaultJobLockProvider : IJobLockProvider, ISingletonDependency |
|
|
|
{ |
|
|
|
private readonly ConcurrentDictionary<string, JobLock> _localSyncObjects = new(); |
|
|
|
|
|
|
|
public virtual Task<bool> TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default) |
|
|
|
public virtual Task<bool> TryLockAsync(string jobKey, int lockSeconds) |
|
|
|
{ |
|
|
|
Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey)); |
|
|
|
if (_localSyncObjects.TryGetValue(jobKey, out var jobLock)) |
|
|
|
@ -34,7 +34,7 @@ public class DefaultJobLockProvider : IJobLockProvider, ISingletonDependency |
|
|
|
return Task.FromResult(_localSyncObjects.TryAdd(jobKey, jobLock)); |
|
|
|
} |
|
|
|
|
|
|
|
public Task<bool> TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default) |
|
|
|
public Task<bool> TryReleaseAsync(string jobKey) |
|
|
|
{ |
|
|
|
Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey)); |
|
|
|
|
|
|
|
|
|
|
|
@ -1,5 +1,4 @@ |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using System.Threading.Tasks; |
|
|
|
|
|
|
|
namespace LINGYUN.Abp.BackgroundTasks; |
|
|
|
|
|
|
|
@ -10,10 +9,8 @@ public interface IJobLockProvider |
|
|
|
{ |
|
|
|
Task<bool> TryLockAsync( |
|
|
|
string jobKey, |
|
|
|
int lockSeconds, |
|
|
|
CancellationToken cancellationToken = default); |
|
|
|
int lockSeconds); |
|
|
|
|
|
|
|
Task<bool> TryReleaseAsync( |
|
|
|
string jobKey, |
|
|
|
CancellationToken cancellationToken = default); |
|
|
|
string jobKey); |
|
|
|
} |
|
|
|
|
|
|
|
@ -38,7 +38,7 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency |
|
|
|
{ |
|
|
|
jobType = jobType.GetGenericArguments()[0]; |
|
|
|
} |
|
|
|
Logger.LogInformation($"The task {jobType.Name} could not be performed..."); |
|
|
|
Logger.LogWarning($"The task {jobType.Name} could not be performed..."); |
|
|
|
|
|
|
|
return Task.FromResult(-1); |
|
|
|
} |
|
|
|
|
|
|
|
@ -27,10 +27,10 @@ public class QuartzTriggerListener : TriggerListenerSupport, ISingletonDependenc |
|
|
|
{ |
|
|
|
context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId); |
|
|
|
context.MergedJobDataMap.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime); |
|
|
|
if (jobId != null && lockTime != null && lockTime is int time && time > 0) |
|
|
|
if (jobId != null && lockTime != null && int.TryParse(lockTime.ToString(), out var time) && time > 0) |
|
|
|
{ |
|
|
|
|
|
|
|
return !await JobLockProvider.TryLockAsync(NormalizeKey(context, jobId), time, cancellationToken); |
|
|
|
// 传递令牌将清除本次锁, 那并无意义
|
|
|
|
return !await JobLockProvider.TryLockAsync(NormalizeKey(context, jobId), time); |
|
|
|
} |
|
|
|
|
|
|
|
return false; |
|
|
|
@ -45,7 +45,7 @@ public class QuartzTriggerListener : TriggerListenerSupport, ISingletonDependenc |
|
|
|
if (context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId) && |
|
|
|
context.MergedJobDataMap.ContainsKey(nameof(JobInfo.LockTimeOut))) |
|
|
|
{ |
|
|
|
await JobLockProvider.TryReleaseAsync(NormalizeKey(context, jobId), cancellationToken); |
|
|
|
await JobLockProvider.TryReleaseAsync(NormalizeKey(context, jobId)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -1,6 +1,5 @@ |
|
|
|
using Microsoft.Extensions.Caching.Memory; |
|
|
|
using System; |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using Volo.Abp.DependencyInjection; |
|
|
|
using Volo.Abp.DistributedLocking; |
|
|
|
@ -21,9 +20,9 @@ public class JobLockProvider : IJobLockProvider, ISingletonDependency |
|
|
|
DistributedLock = distributedLock; |
|
|
|
} |
|
|
|
|
|
|
|
public virtual async Task<bool> TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default) |
|
|
|
public virtual async Task<bool> TryLockAsync(string jobKey, int lockSeconds) |
|
|
|
{ |
|
|
|
var handle = await DistributedLock.TryAcquireAsync(jobKey, cancellationToken: cancellationToken); |
|
|
|
var handle = await DistributedLock.TryAcquireAsync(jobKey); |
|
|
|
if (handle != null) |
|
|
|
{ |
|
|
|
await LockCache.GetOrCreateAsync(jobKey, (entry) => |
|
|
|
@ -46,7 +45,7 @@ public class JobLockProvider : IJobLockProvider, ISingletonDependency |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
public virtual async Task<bool> TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default) |
|
|
|
public virtual async Task<bool> TryReleaseAsync(string jobKey) |
|
|
|
{ |
|
|
|
if (LockCache.TryGetValue<IAbpDistributedLockHandle>(jobKey, out var handle)) |
|
|
|
{ |
|
|
|
|