Browse Source

feat(tasks): 增加默认基于内存字典的作业锁定提供者

pull/462/head
cKey 4 years ago
parent
commit
b8681c56f6
  1. 39
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs
  2. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/ConsoleJob.cs
  3. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/SleepJob.cs
  4. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/IQuartzJobExecutorProvider.cs
  5. 3
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobScheduler.cs
  6. 12
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs
  7. 4
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/en.json
  8. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/zh-Hans.json
  9. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs
  10. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/EfCoreBackgroundJobInfoRepository.cs

39
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs

@ -0,0 +1,39 @@
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp;
using Volo.Abp.DependencyInjection;
namespace LINGYUN.Abp.BackgroundTasks;
[Dependency(TryRegister = true)]
public class DefaultJobLockProvider : IJobLockProvider, ISingletonDependency
{
private readonly ConcurrentDictionary<string, SemaphoreSlim> _localSyncObjects = new();
public virtual Task<bool> TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey));
if (_localSyncObjects.ContainsKey(jobKey))
{
return Task.FromResult(false);
}
var semaphore = new SemaphoreSlim(1, 1);
return Task.FromResult(_localSyncObjects.TryAdd(jobKey, semaphore));
}
public Task<bool> TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey));
if (_localSyncObjects.TryGetValue(jobKey, out var semaphore))
{
semaphore.Dispose();
return Task.FromResult(_localSyncObjects.TryRemove(jobKey, out _));
}
return Task.FromResult(false);
}
}

2
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/ConsoleJob.cs

@ -9,7 +9,7 @@ public class ConsoleJob : IJobRunnable
public Task ExecuteAsync(JobRunnableContext context) public Task ExecuteAsync(JobRunnableContext context)
{ {
context.TryGetString(PropertyMessage, out var message); context.TryGetString(PropertyMessage, out var message);
Console.WriteLine($"[{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}] - This message: {message ?? "None"} comes from the job: {GetType()}"); Console.WriteLine($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}] - This message: {message ?? "None"} comes from the job: {GetType()}");
return Task.CompletedTask; return Task.CompletedTask;
} }
} }

2
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/SleepJob.cs

@ -9,7 +9,7 @@ public class SleepJob : IJobRunnable
{ {
context.JobData.TryGetValue("Delay", out var sleep); context.JobData.TryGetValue("Delay", out var sleep);
Console.WriteLine($"Sleep {sleep ?? 20000} milliseconds."); Console.WriteLine($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}] - Sleep {sleep ?? 20000} milliseconds.");
await Task.Delay(sleep?.To<int>() ?? 20000); await Task.Delay(sleep?.To<int>() ?? 20000);
} }

2
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/IQuartzJobExecutorProvider.cs

@ -7,6 +7,6 @@ public interface IQuartzJobExecutorProvider
#nullable enable #nullable enable
IJobDetail? CreateJob(JobInfo job); IJobDetail? CreateJob(JobInfo job);
ITrigger CreateTrigger(JobInfo job); ITrigger? CreateTrigger(JobInfo job);
#nullable disable #nullable disable
} }

3
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobScheduler.cs

@ -1,7 +1,6 @@
using Quartz; using Quartz;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
namespace LINGYUN.Abp.BackgroundTasks.Quartz; namespace LINGYUN.Abp.BackgroundTasks.Quartz;
@ -84,7 +83,7 @@ public class QuartzJobScheduler : IJobScheduler, ISingletonDependency
continue; continue;
} }
jobDictionary.Add(jobDetail, new ITrigger[] { jobTrigger }); jobDictionary[jobDetail] = new ITrigger[] { jobTrigger };
} }
await Scheduler.ScheduleJobs(jobDictionary, false); await Scheduler.ScheduleJobs(jobDictionary, false);

12
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/JobExecutedEvent.cs

@ -34,7 +34,7 @@ public class JobExecutedEvent : JobEventBase<JobExecutedEvent>, ITransientDepend
job.IsAbandoned = false; job.IsAbandoned = false;
// 将任务标记为运行中, 会被轮询重新进入队列 // 将任务标记为运行中, 会被轮询重新进入队列
job.Status = JobStatus.FailedRetry; job.Status = JobStatus.FailedRetry;
job.Result = context.EventData.Exception.Message; job.Result = GetExceptionMessage(context.EventData.Exception);
// 多次异常后需要重新计算优先级 // 多次异常后需要重新计算优先级
if (job.TryCount <= (job.MaxTryCount / 2) && if (job.TryCount <= (job.MaxTryCount / 2) &&
@ -93,4 +93,14 @@ public class JobExecutedEvent : JobEventBase<JobExecutedEvent>, ITransientDepend
Logger.LogWarning($"An exception thow with job exception notify: {ex.Message}"); Logger.LogWarning($"An exception thow with job exception notify: {ex.Message}");
} }
} }
private string GetExceptionMessage(Exception exception)
{
if (exception.InnerException != null)
{
return exception.Message + " => " + GetExceptionMessage(exception.InnerException);
}
return exception.Message;
}
} }

4
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/en.json

@ -39,8 +39,8 @@
"Description:IsAbandoned": "If a job fails for several times and reaches the maximum number of retries, the job is marked as no longer being executed. You can manually add the job to the queue by changing the maximum number of retries.", "Description:IsAbandoned": "If a job fails for several times and reaches the maximum number of retries, the job is marked as no longer being executed. You can manually add the job to the queue by changing the maximum number of retries.",
"DisplayName:CreationTime": "CreationTime", "DisplayName:CreationTime": "CreationTime",
"DisplayName:Result": "Last Execution Result", "DisplayName:Result": "Last Execution Result",
"DisplayName:LastRunTime": "LastRunTime", "DisplayName:LastRunTime": "Last execution time",
"DisplayName:NextRunTime": "NextRunTime", "DisplayName:NextRunTime": "Next expected time",
"DisplayName:Cron": "Cron expression", "DisplayName:Cron": "Cron expression",
"Description:Cron": "Category is valid when periodic job, reference: https://www.quartz-scheduler.net/documentation/quartz-3.x/tutorial/crontrigger.html#introduction", "Description:Cron": "Category is valid when periodic job, reference: https://www.quartz-scheduler.net/documentation/quartz-3.x/tutorial/crontrigger.html#introduction",
"DisplayName:MaxCount": "Max Count", "DisplayName:MaxCount": "Max Count",

2
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/Localization/Resources/zh-Hans.json

@ -40,7 +40,7 @@
"DisplayName:CreationTime": "创建时间", "DisplayName:CreationTime": "创建时间",
"DisplayName:Result": "上次执行结果", "DisplayName:Result": "上次执行结果",
"DisplayName:LastRunTime": "上次执行时间", "DisplayName:LastRunTime": "上次执行时间",
"DisplayName:NextRunTime": "下次执行时间", "DisplayName:NextRunTime": "下次预期时间",
"DisplayName:Cron": "Cron表达式", "DisplayName:Cron": "Cron表达式",
"Description:Cron": "类别为周期性作业时有效, 参考: https://www.quartz-scheduler.net/documentation/quartz-3.x/tutorial/crontrigger.html#introduction", "Description:Cron": "类别为周期性作业时有效, 参考: https://www.quartz-scheduler.net/documentation/quartz-3.x/tutorial/crontrigger.html#introduction",
"DisplayName:MaxCount": "最大触发次数", "DisplayName:MaxCount": "最大触发次数",

2
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs

@ -116,6 +116,8 @@ public class BackgroundJobManager : DomainService
await JobScheduler.ResumeAsync(job); await JobScheduler.ResumeAsync(job);
jobInfo.SetStatus(JobStatus.Running); jobInfo.SetStatus(JobStatus.Running);
jobInfo.IsAbandoned = false;
jobInfo.IsEnabled = true;
await BackgroundJobInfoRepository.UpdateAsync(jobInfo); await BackgroundJobInfoRepository.UpdateAsync(jobInfo);
} }

2
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/EfCoreBackgroundJobInfoRepository.cs

@ -58,6 +58,7 @@ public class EfCoreBackgroundJobInfoRepository :
return await (await GetDbSetAsync()) return await (await GetDbSetAsync())
.Where(x => x.IsEnabled && !x.IsAbandoned) .Where(x => x.IsEnabled && !x.IsAbandoned)
.Where(x => x.JobType == JobType.Period && status.Contains(x.Status)) .Where(x => x.JobType == JobType.Period && status.Contains(x.Status))
.Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount))
.OrderByDescending(x => x.Priority) .OrderByDescending(x => x.Priority)
.ToListAsync(GetCancellationToken(cancellationToken)); .ToListAsync(GetCancellationToken(cancellationToken));
} }
@ -114,6 +115,7 @@ public class EfCoreBackgroundJobInfoRepository :
return await (await GetDbSetAsync()) return await (await GetDbSetAsync())
.Where(x => x.IsEnabled && !x.IsAbandoned) .Where(x => x.IsEnabled && !x.IsAbandoned)
.Where(x => x.JobType != JobType.Period && status.Contains(x.Status)) .Where(x => x.JobType != JobType.Period && status.Contains(x.Status))
.Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount))
.OrderByDescending(x => x.Priority) .OrderByDescending(x => x.Priority)
.ThenBy(x => x.TryCount) .ThenBy(x => x.TryCount)
.ThenBy(x => x.NextRunTime) .ThenBy(x => x.NextRunTime)

Loading…
Cancel
Save