Browse Source

fix(tasks): fixed concurrent job lock failure.

pull/539/head
cKey 4 years ago
parent
commit
c22e0b5c00
  1. 4
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs
  2. 9
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs
  3. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs
  4. 8
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs
  5. 7
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs

4
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs

@ -12,7 +12,7 @@ public class DefaultJobLockProvider : IJobLockProvider, ISingletonDependency
{ {
private readonly ConcurrentDictionary<string, JobLock> _localSyncObjects = new(); private readonly ConcurrentDictionary<string, JobLock> _localSyncObjects = new();
public virtual Task<bool> TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default) public virtual Task<bool> TryLockAsync(string jobKey, int lockSeconds)
{ {
Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey)); Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey));
if (_localSyncObjects.TryGetValue(jobKey, out var jobLock)) if (_localSyncObjects.TryGetValue(jobKey, out var jobLock))
@ -34,7 +34,7 @@ public class DefaultJobLockProvider : IJobLockProvider, ISingletonDependency
return Task.FromResult(_localSyncObjects.TryAdd(jobKey, jobLock)); return Task.FromResult(_localSyncObjects.TryAdd(jobKey, jobLock));
} }
public Task<bool> TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default) public Task<bool> TryReleaseAsync(string jobKey)
{ {
Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey)); Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey));

9
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs

@ -1,5 +1,4 @@
using System.Threading; using System.Threading.Tasks;
using System.Threading.Tasks;
namespace LINGYUN.Abp.BackgroundTasks; namespace LINGYUN.Abp.BackgroundTasks;
@ -10,10 +9,8 @@ public interface IJobLockProvider
{ {
Task<bool> TryLockAsync( Task<bool> TryLockAsync(
string jobKey, string jobKey,
int lockSeconds, int lockSeconds);
CancellationToken cancellationToken = default);
Task<bool> TryReleaseAsync( Task<bool> TryReleaseAsync(
string jobKey, string jobKey);
CancellationToken cancellationToken = default);
} }

2
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs

@ -38,7 +38,7 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
{ {
jobType = jobType.GetGenericArguments()[0]; jobType = jobType.GetGenericArguments()[0];
} }
Logger.LogInformation($"The task {jobType.Name} could not be performed..."); Logger.LogWarning($"The task {jobType.Name} could not be performed...");
return Task.FromResult(-1); return Task.FromResult(-1);
} }

8
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs

@ -27,10 +27,10 @@ public class QuartzTriggerListener : TriggerListenerSupport, ISingletonDependenc
{ {
context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId); context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId);
context.MergedJobDataMap.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime); context.MergedJobDataMap.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime);
if (jobId != null && lockTime != null && lockTime is int time && time > 0) if (jobId != null && lockTime != null && int.TryParse(lockTime.ToString(), out var time) && time > 0)
{ {
// 传递令牌将清除本次锁, 那并无意义
return !await JobLockProvider.TryLockAsync(NormalizeKey(context, jobId), time, cancellationToken); return !await JobLockProvider.TryLockAsync(NormalizeKey(context, jobId), time);
} }
return false; return false;
@ -45,7 +45,7 @@ public class QuartzTriggerListener : TriggerListenerSupport, ISingletonDependenc
if (context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId) && if (context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId) &&
context.MergedJobDataMap.ContainsKey(nameof(JobInfo.LockTimeOut))) context.MergedJobDataMap.ContainsKey(nameof(JobInfo.LockTimeOut)))
{ {
await JobLockProvider.TryReleaseAsync(NormalizeKey(context, jobId), cancellationToken); await JobLockProvider.TryReleaseAsync(NormalizeKey(context, jobId));
} }
} }

7
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs

@ -1,6 +1,5 @@
using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Caching.Memory;
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.DistributedLocking; using Volo.Abp.DistributedLocking;
@ -21,9 +20,9 @@ public class JobLockProvider : IJobLockProvider, ISingletonDependency
DistributedLock = distributedLock; DistributedLock = distributedLock;
} }
public virtual async Task<bool> TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default) public virtual async Task<bool> TryLockAsync(string jobKey, int lockSeconds)
{ {
var handle = await DistributedLock.TryAcquireAsync(jobKey, cancellationToken: cancellationToken); var handle = await DistributedLock.TryAcquireAsync(jobKey);
if (handle != null) if (handle != null)
{ {
await LockCache.GetOrCreateAsync(jobKey, (entry) => await LockCache.GetOrCreateAsync(jobKey, (entry) =>
@ -46,7 +45,7 @@ public class JobLockProvider : IJobLockProvider, ISingletonDependency
return false; return false;
} }
public virtual async Task<bool> TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default) public virtual async Task<bool> TryReleaseAsync(string jobKey)
{ {
if (LockCache.TryGetValue<IAbpDistributedLockHandle>(jobKey, out var handle)) if (LockCache.TryGetValue<IAbpDistributedLockHandle>(jobKey, out var handle))
{ {

Loading…
Cancel
Save