Browse Source

fix(tasks): the lock task does not block the listener from running

pull/461/head
cKey 4 years ago
parent
commit
27b5c69d12
  1. 19
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs
  2. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/AbpBackgroundTasksQuartzModule.cs
  3. 10
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs
  4. 22
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs
  5. 56
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs
  6. 1
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN.Abp.BackgroundTasks.csproj
  7. 61
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs
  8. 30
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobRunnableExecuter.cs
  9. 6
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs
  10. 1
      aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/LY.MicroService.TaskManagement.HttpApi.Host.csproj
  11. 2
      aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs

19
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/IJobLockProvider.cs

@ -0,0 +1,19 @@
using System.Threading;
using System.Threading.Tasks;
namespace LINGYUN.Abp.BackgroundTasks;
/// <summary>
/// 作业锁定提供者
/// </summary>
public interface IJobLockProvider
{
Task<bool> TryLockAsync(
string jobKey,
int lockSeconds,
CancellationToken cancellationToken = default);
Task<bool> TryReleaseAsync(
string jobKey,
CancellationToken cancellationToken = default);
}

2
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/AbpBackgroundTasksQuartzModule.cs

@ -15,5 +15,7 @@ public class AbpBackgroundTasksQuartzModule : AbpModule
var _scheduler = context.ServiceProvider.GetRequiredService<IScheduler>();
_scheduler.ListenerManager.AddJobListener(context.ServiceProvider.GetRequiredService<QuartzJobListener>());
_scheduler.ListenerManager.AddTriggerListener(context.ServiceProvider.GetRequiredService<QuartzTriggerListener>());
}
}

10
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs

@ -36,10 +36,12 @@ public class QuartzJobExecutorProvider : IQuartzJobExecutorProvider, ISingletonD
}
var adapterType = typeof(QuartzJobSimpleAdapter<>);
if (job.LockTimeOut > 0)
{
adapterType = typeof(QuartzJobConcurrentAdapter<>);
}
// 注释, 通过触发器监听锁定
//if (job.LockTimeOut > 0)
//{
// adapterType = typeof(QuartzJobConcurrentAdapter<>);
//}
if (!typeof(IJob).IsAssignableFrom(jobType))
{

22
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobListener.cs

@ -4,11 +4,12 @@ using Microsoft.Extensions.Logging.Abstractions;
using Quartz;
using Quartz.Listener;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Uow;
using Volo.Abp.Timing;
namespace LINGYUN.Abp.BackgroundTasks.Quartz;
@ -18,13 +19,16 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
public override string Name => "QuartzJobListener";
protected IClock Clock { get; }
protected IJobEventProvider EventProvider { get; }
protected IServiceProvider ServiceProvider { get; }
public QuartzJobListener(
IClock clock,
IServiceProvider serviceProvider,
IJobEventProvider eventProvider)
{
Clock = clock;
ServiceProvider = serviceProvider;
EventProvider = eventProvider;
@ -50,6 +54,12 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
{
try
{
var jobEventList = EventProvider.GetAll();
if (!jobEventList.Any())
{
return;
}
using var scope = ServiceProvider.CreateScope();
var jobEventData = new JobEventData(
jobUUId,
@ -60,7 +70,6 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
Result = context.Result?.ToString()
};
var jobEventList = EventProvider.GetAll();
var eventContext = new JobEventContext(
scope.ServiceProvider,
jobEventData);
@ -86,6 +95,12 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
{
try
{
var jobEventList = EventProvider.GetAll();
if (!jobEventList.Any())
{
return;
}
using var scope = ServiceProvider.CreateScope();
var jobId = context.GetString(nameof(JobInfo.Id));
if (Guid.TryParse(jobId, out var jobUUId))
@ -112,7 +127,7 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
jobEventData.RepeatCount = simpleTrigger.RepeatCount;
}
jobEventData.Description = context.JobDetail.Description;
jobEventData.RunTime = context.FireTimeUtc.LocalDateTime;
jobEventData.RunTime = Clock.Now;
jobEventData.LastRunTime = context.PreviousFireTimeUtc?.LocalDateTime;
jobEventData.NextRunTime = context.NextFireTimeUtc?.LocalDateTime;
if (context.Result != null)
@ -125,7 +140,6 @@ public class QuartzJobListener : JobListenerSupport, ISingletonDependency
jobEventData.TenantId = tenantId;
}
var jobEventList = EventProvider.GetAll();
var eventContext = new JobEventContext(
scope.ServiceProvider,
jobEventData);

56
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs

@ -0,0 +1,56 @@
using Quartz;
using Quartz.Listener;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace LINGYUN.Abp.BackgroundTasks.Quartz;
public class QuartzTriggerListener : TriggerListenerSupport, ISingletonDependency
{
protected const string LockKeyFormat = "p:abp-background-tasks,job:{0},key:{1}";
public override string Name => "QuartzTriggerListener";
protected IJobLockProvider JobLockProvider { get; }
public QuartzTriggerListener(
IJobLockProvider jobLockProvider)
{
JobLockProvider = jobLockProvider;
}
public override async Task<bool> VetoJobExecution(
ITrigger trigger,
IJobExecutionContext context,
CancellationToken cancellationToken = default)
{
context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId);
context.MergedJobDataMap.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime);
if (jobId != null && lockTime != null && lockTime is int time && time > 0)
{
return !await JobLockProvider.TryLockAsync(NormalizeKey(context, jobId), time, cancellationToken);
}
return false;
}
public override async Task TriggerComplete(
ITrigger trigger,
IJobExecutionContext context,
SchedulerInstruction triggerInstructionCode,
CancellationToken cancellationToken = default)
{
if (context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId) &&
context.MergedJobDataMap.ContainsKey(nameof(JobInfo.LockTimeOut)))
{
await JobLockProvider.TryReleaseAsync(NormalizeKey(context, jobId), cancellationToken);
}
}
protected virtual string NormalizeKey(IJobExecutionContext context, object jobId)
{
return string.Format(LockKeyFormat, context.JobDetail.JobType.Name, jobId.ToString());
}
}

1
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN.Abp.BackgroundTasks.csproj

@ -11,6 +11,7 @@
<ItemGroup>
<PackageReference Include="Volo.Abp.Auditing" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.BackgroundJobs.Abstractions" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Caching" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.DistributedLocking.Abstractions" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Guids" Version="$(VoloAbpPackageVersion)" />
</ItemGroup>

61
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobLockProvider.cs

@ -0,0 +1,61 @@
using Microsoft.Extensions.Caching.Memory;
using System;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DistributedLocking;
namespace LINGYUN.Abp.BackgroundTasks;
[Dependency(ReplaceServices = true)]
public class JobLockProvider : IJobLockProvider, ISingletonDependency
{
protected IMemoryCache LockCache { get; }
protected IAbpDistributedLock DistributedLock { get; }
public JobLockProvider(
IMemoryCache lockCache,
IAbpDistributedLock distributedLock)
{
LockCache = lockCache;
DistributedLock = distributedLock;
}
public virtual async Task<bool> TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default)
{
var handle = await DistributedLock.TryAcquireAsync(jobKey, cancellationToken: cancellationToken);
if (handle != null)
{
await LockCache.GetOrCreateAsync(jobKey, (entry) =>
{
entry.SetAbsoluteExpiration(TimeSpan.FromSeconds(lockSeconds));
entry.RegisterPostEvictionCallback(async (key, value, reason, state) =>
{
if (reason == EvictionReason.Expired && value is IAbpDistributedLockHandle handleValue)
{
await handleValue.DisposeAsync();
}
});
entry.SetValue(handle);
return Task.FromResult(handle);
});
return true;
}
return false;
}
public virtual async Task<bool> TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default)
{
if (LockCache.TryGetValue<IAbpDistributedLockHandle>(jobKey, out var handle))
{
await handle.DisposeAsync();
LockCache.Remove(jobKey);
return true;
}
return false;
}
}

30
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/JobRunnableExecuter.cs

@ -1,17 +1,13 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DistributedLocking;
using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.BackgroundTasks;
public class JobRunnableExecuter : IJobRunnableExecuter, ISingletonDependency
{
protected const string LockKeyFormat = "p:{0},job:{1},key:{2}";
public async virtual Task ExecuteAsync(JobRunnableContext context)
{
Guid? tenantId = null;
@ -24,31 +20,7 @@ public class JobRunnableExecuter : IJobRunnableExecuter, ISingletonDependency
var currentTenant = context.ServiceProvider.GetRequiredService<ICurrentTenant>();
using (currentTenant.Change(tenantId))
{
context.JobData.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime);
// 某些提供者如果无法保证锁一致性, 那么需要用分布式锁
if (lockTime != null && (lockTime is int time && time > 0))
{
var jobId = context.JobData.GetOrDefault(nameof(JobInfo.Id));
var jobLockKey = string.Format(LockKeyFormat, tenantId?.ToString() ?? "Default", context.JobType.Name, jobId);
var distributedLock = context.ServiceProvider.GetRequiredService<IAbpDistributedLock>();
var handle = await distributedLock.TryAcquireAsync(jobLockKey, TimeSpan.FromSeconds(time));
if (handle == null)
{
// 抛出异常 通过监听器使其重试
throw new AbpBackgroundTaskConcurrentException(context.JobType);
}
await using (handle)
{
await InternalExecuteAsync(context);
}
}
else
{
await InternalExecuteAsync(context);
}
await InternalExecuteAsync(context);
}
}

6
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs

@ -156,8 +156,10 @@ public class BackgroundJobInfoAppService : TaskManagementApplicationService, IBa
backgroundJobInfo.MaxCount = input.MaxCount;
backgroundJobInfo.MaxTryCount = input.MaxTryCount;
backgroundJobInfo.Args.RemoveAll(x => !input.Args.ContainsKey(x.Key));
backgroundJobInfo.Args.AddIfNotContains(input.Args);
foreach (var arg in input.Args)
{
backgroundJobInfo.Args[arg.Key] = arg.Value;
}
backgroundJobInfo.SetPriority(input.Priority);
switch (input.JobType)

1
aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/LY.MicroService.TaskManagement.HttpApi.Host.csproj

@ -38,6 +38,7 @@
<PackageReference Include="Volo.Abp.AspNetCore.MultiTenancy" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.AspNetCore.Authentication.JwtBearer" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Autofac" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.DistributedLocking" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Swashbuckle" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.EntityFrameworkCore.MySql" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Http.Client.IdentityModel.Web" Version="$(VoloAbpPackageVersion)" />

2
aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs

@ -22,6 +22,7 @@ using Volo.Abp.AspNetCore.Mvc;
using Volo.Abp.AspNetCore.Serilog;
using Volo.Abp.Autofac;
using Volo.Abp.Caching.StackExchangeRedis;
using Volo.Abp.DistributedLocking;
using Volo.Abp.EntityFrameworkCore.MySQL;
using Volo.Abp.FeatureManagement.EntityFrameworkCore;
using Volo.Abp.Http.Client.IdentityModel.Web;
@ -38,6 +39,7 @@ namespace LY.MicroService.TaskManagement;
typeof(AbpSerilogEnrichersUniqueIdModule),
typeof(AbpAuditLoggingElasticsearchModule),
typeof(AbpAspNetCoreSerilogModule),
typeof(AbpDistributedLockingModule),
typeof(AbpEntityFrameworkCoreMySQLModule),
typeof(AbpAspNetCoreAuthenticationJwtBearerModule),
typeof(AbpEmailingExceptionHandlingModule),

Loading…
Cancel
Save