|
|
|
@ -5,6 +5,7 @@ using Microsoft.Extensions.DependencyInjection; |
|
|
|
using Microsoft.Extensions.Logging; |
|
|
|
using Microsoft.Extensions.Options; |
|
|
|
using Volo.Abp.BackgroundWorkers; |
|
|
|
using Volo.Abp.DistributedLocking; |
|
|
|
using Volo.Abp.Threading; |
|
|
|
using Volo.Abp.Timing; |
|
|
|
|
|
|
|
@ -16,15 +17,19 @@ namespace Volo.Abp.BackgroundJobs |
|
|
|
|
|
|
|
protected AbpBackgroundJobWorkerOptions WorkerOptions { get; } |
|
|
|
|
|
|
|
protected IAbpDistributedLock DistributedLock { get; } |
|
|
|
|
|
|
|
public BackgroundJobWorker( |
|
|
|
AbpAsyncTimer timer, |
|
|
|
IOptions<AbpBackgroundJobOptions> jobOptions, |
|
|
|
IOptions<AbpBackgroundJobWorkerOptions> workerOptions, |
|
|
|
IServiceScopeFactory serviceScopeFactory) |
|
|
|
IServiceScopeFactory serviceScopeFactory, |
|
|
|
IAbpDistributedLock distributedLock) |
|
|
|
: base( |
|
|
|
timer, |
|
|
|
serviceScopeFactory) |
|
|
|
{ |
|
|
|
DistributedLock = distributedLock; |
|
|
|
WorkerOptions = workerOptions.Value; |
|
|
|
JobOptions = jobOptions.Value; |
|
|
|
Timer.Period = WorkerOptions.JobPollPeriod; |
|
|
|
@ -32,57 +37,74 @@ namespace Volo.Abp.BackgroundJobs |
|
|
|
|
|
|
|
protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext) |
|
|
|
{ |
|
|
|
var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>(); |
|
|
|
|
|
|
|
var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount); |
|
|
|
|
|
|
|
if (!waitingJobs.Any()) |
|
|
|
await using (var handler = await DistributedLock.TryAcquireAsync("")) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
var jobExecuter = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>(); |
|
|
|
var clock = workerContext.ServiceProvider.GetRequiredService<IClock>(); |
|
|
|
var serializer = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>(); |
|
|
|
|
|
|
|
foreach (var jobInfo in waitingJobs) |
|
|
|
{ |
|
|
|
jobInfo.TryCount++; |
|
|
|
jobInfo.LastTryTime = clock.Now; |
|
|
|
|
|
|
|
try |
|
|
|
if (handler != null) |
|
|
|
{ |
|
|
|
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName); |
|
|
|
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType); |
|
|
|
var context = new JobExecutionContext(workerContext.ServiceProvider, jobConfiguration.JobType, jobArgs); |
|
|
|
var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>(); |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
await jobExecuter.ExecuteAsync(context); |
|
|
|
var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount); |
|
|
|
|
|
|
|
await store.DeleteAsync(jobInfo.Id); |
|
|
|
if (!waitingJobs.Any()) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
catch (BackgroundJobExecutionException) |
|
|
|
|
|
|
|
var jobExecuter = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>(); |
|
|
|
var clock = workerContext.ServiceProvider.GetRequiredService<IClock>(); |
|
|
|
var serializer = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>(); |
|
|
|
|
|
|
|
foreach (var jobInfo in waitingJobs) |
|
|
|
{ |
|
|
|
var nextTryTime = CalculateNextTryTime(jobInfo, clock); |
|
|
|
jobInfo.TryCount++; |
|
|
|
jobInfo.LastTryTime = clock.Now; |
|
|
|
|
|
|
|
if (nextTryTime.HasValue) |
|
|
|
try |
|
|
|
{ |
|
|
|
jobInfo.NextTryTime = nextTryTime.Value; |
|
|
|
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName); |
|
|
|
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType); |
|
|
|
var context = new JobExecutionContext( |
|
|
|
workerContext.ServiceProvider, |
|
|
|
jobConfiguration.JobType, |
|
|
|
jobArgs); |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
await jobExecuter.ExecuteAsync(context); |
|
|
|
|
|
|
|
await store.DeleteAsync(jobInfo.Id); |
|
|
|
} |
|
|
|
catch (BackgroundJobExecutionException) |
|
|
|
{ |
|
|
|
var nextTryTime = CalculateNextTryTime(jobInfo, clock); |
|
|
|
|
|
|
|
if (nextTryTime.HasValue) |
|
|
|
{ |
|
|
|
jobInfo.NextTryTime = nextTryTime.Value; |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
jobInfo.IsAbandoned = true; |
|
|
|
} |
|
|
|
|
|
|
|
await TryUpdateAsync(store, jobInfo); |
|
|
|
} |
|
|
|
} |
|
|
|
else |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
Logger.LogException(ex); |
|
|
|
jobInfo.IsAbandoned = true; |
|
|
|
await TryUpdateAsync(store, jobInfo); |
|
|
|
} |
|
|
|
|
|
|
|
await TryUpdateAsync(store, jobInfo); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
else |
|
|
|
{ |
|
|
|
Logger.LogException(ex); |
|
|
|
jobInfo.IsAbandoned = true; |
|
|
|
await TryUpdateAsync(store, jobInfo); |
|
|
|
try |
|
|
|
{ |
|
|
|
await Task.Delay(WorkerOptions.JobPollPeriod * 12, StoppingToken); |
|
|
|
} |
|
|
|
catch (TaskCanceledException) { } |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -101,7 +123,8 @@ namespace Volo.Abp.BackgroundJobs |
|
|
|
|
|
|
|
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock) |
|
|
|
{ |
|
|
|
var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); |
|
|
|
var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * |
|
|
|
(Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); |
|
|
|
var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ?? |
|
|
|
clock.Now.AddSeconds(nextWaitDuration); |
|
|
|
|
|
|
|
@ -113,4 +136,4 @@ namespace Volo.Abp.BackgroundJobs |
|
|
|
return nextTryDate; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |