Browse Source

Merge pull request #12420 from abpframework/CancellationTokenInWorker

Add `CancellationToken` paramter to `AddAsync` and `DoWorkAsync` methods.
pull/12921/head
liangshiwei 4 years ago
committed by GitHub
parent
commit
4e2849ef23
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerBase.cs
  2. 8
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs
  3. 5
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfirePeriodicBackgroundWorkerAdapter.cs
  4. 7
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/IHangfireBackgroundWorker.cs
  5. 23
      framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerManager.cs
  6. 4
      framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzPeriodicBackgroundWorkerAdapter.cs
  7. 13
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AsyncPeriodicBackgroundWorkerBase.cs
  8. 4
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs
  9. 11
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkersApplicationInitializationContextExtensions.cs
  10. 6
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/IBackgroundWorkerManager.cs
  11. 10
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerContext.cs

7
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerBase.cs

@ -1,4 +1,5 @@
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Volo.Abp.BackgroundWorkers.Hangfire; namespace Volo.Abp.BackgroundWorkers.Hangfire;
@ -8,12 +9,12 @@ public abstract class HangfireBackgroundWorkerBase : BackgroundWorkerBase, IHang
public string RecurringJobId { get; set; } public string RecurringJobId { get; set; }
public string CronExpression { get; set; } public string CronExpression { get; set; }
public TimeZoneInfo TimeZone { get; set; } public TimeZoneInfo TimeZone { get; set; }
public string Queue { get; set; } public string Queue { get; set; }
public abstract Task DoWorkAsync(); public abstract Task DoWorkAsync(CancellationToken cancellationToken = default);
protected HangfireBackgroundWorkerBase() protected HangfireBackgroundWorkerBase()
{ {

8
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs

@ -33,19 +33,19 @@ public class HangfireBackgroundWorkerManager : IBackgroundWorkerManager, ISingle
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task AddAsync(IBackgroundWorker worker) public Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{ {
if (worker is IHangfireBackgroundWorker hangfireBackgroundWorker) if (worker is IHangfireBackgroundWorker hangfireBackgroundWorker)
{ {
var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker); var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker);
if (hangfireBackgroundWorker.RecurringJobId.IsNullOrWhiteSpace()) if (hangfireBackgroundWorker.RecurringJobId.IsNullOrWhiteSpace())
{ {
RecurringJob.AddOrUpdate(() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(), RecurringJob.AddOrUpdate(() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, hangfireBackgroundWorker.Queue); hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, hangfireBackgroundWorker.Queue);
} }
else else
{ {
RecurringJob.AddOrUpdate(hangfireBackgroundWorker.RecurringJobId, () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(), RecurringJob.AddOrUpdate(hangfireBackgroundWorker.RecurringJobId, () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, hangfireBackgroundWorker.Queue); hangfireBackgroundWorker.CronExpression, hangfireBackgroundWorker.TimeZone, hangfireBackgroundWorker.Queue);
} }
} }
@ -80,7 +80,7 @@ public class HangfireBackgroundWorkerManager : IBackgroundWorkerManager, ISingle
var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker)); var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
var workerAdapter = Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker; var workerAdapter = Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker;
RecurringJob.AddOrUpdate(() => workerAdapter.DoWorkAsync(), GetCron(period.Value), workerAdapter.TimeZone, workerAdapter.Queue); RecurringJob.AddOrUpdate(() => workerAdapter.DoWorkAsync(cancellationToken), GetCron(period.Value), workerAdapter.TimeZone, workerAdapter.Queue);
} }
return Task.CompletedTask; return Task.CompletedTask;

5
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfirePeriodicBackgroundWorkerAdapter.cs

@ -1,4 +1,5 @@
using System.Reflection; using System.Reflection;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@ -17,9 +18,9 @@ public class HangfirePeriodicBackgroundWorkerAdapter<TWorker> : HangfireBackgrou
_doWorkMethod = typeof(TWorker).GetMethod("DoWork", BindingFlags.Instance | BindingFlags.NonPublic); _doWorkMethod = typeof(TWorker).GetMethod("DoWork", BindingFlags.Instance | BindingFlags.NonPublic);
} }
public async override Task DoWorkAsync() public async override Task DoWorkAsync(CancellationToken cancellationToken = default)
{ {
var workerContext = new PeriodicBackgroundWorkerContext(ServiceProvider); var workerContext = new PeriodicBackgroundWorkerContext(ServiceProvider, cancellationToken);
var worker = ServiceProvider.GetRequiredService<TWorker>(); var worker = ServiceProvider.GetRequiredService<TWorker>();
switch (worker) switch (worker)

7
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/IHangfireBackgroundWorker.cs

@ -1,4 +1,5 @@
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Volo.Abp.BackgroundWorkers.Hangfire; namespace Volo.Abp.BackgroundWorkers.Hangfire;
@ -8,12 +9,10 @@ public interface IHangfireBackgroundWorker : IBackgroundWorker
string RecurringJobId { get; set; } string RecurringJobId { get; set; }
string CronExpression { get; set; } string CronExpression { get; set; }
TimeZoneInfo TimeZone { get; set; } TimeZoneInfo TimeZone { get; set; }
string Queue { get; set; } string Queue { get; set; }
Task DoWorkAsync(); Task DoWorkAsync(CancellationToken cancellationToken = default);
} }

23
framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerManager.cs

@ -4,7 +4,6 @@ using System.Threading.Tasks;
using Quartz; using Quartz;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy; using Volo.Abp.DynamicProxy;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers.Quartz; namespace Volo.Abp.BackgroundWorkers.Quartz;
@ -34,12 +33,12 @@ public class QuartzBackgroundWorkerManager : IBackgroundWorkerManager, ISingleto
} }
} }
public virtual async Task AddAsync(IBackgroundWorker worker) public virtual async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{ {
await ReScheduleJobAsync(worker); await ReScheduleJobAsync(worker, cancellationToken);
} }
protected virtual async Task ReScheduleJobAsync(IBackgroundWorker worker) protected virtual async Task ReScheduleJobAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{ {
if (worker is IQuartzBackgroundWorker quartzWork) if (worker is IQuartzBackgroundWorker quartzWork)
{ {
@ -52,7 +51,7 @@ public class QuartzBackgroundWorkerManager : IBackgroundWorkerManager, ISingleto
} }
else else
{ {
await DefaultScheduleJobAsync(quartzWork); await DefaultScheduleJobAsync(quartzWork, cancellationToken);
} }
} }
else else
@ -65,22 +64,22 @@ public class QuartzBackgroundWorkerManager : IBackgroundWorkerManager, ISingleto
if (workerAdapter?.Trigger != null) if (workerAdapter?.Trigger != null)
{ {
await DefaultScheduleJobAsync(workerAdapter); await DefaultScheduleJobAsync(workerAdapter, cancellationToken);
} }
} }
} }
protected virtual async Task DefaultScheduleJobAsync(IQuartzBackgroundWorker quartzWork) protected virtual async Task DefaultScheduleJobAsync(IQuartzBackgroundWorker quartzWork, CancellationToken cancellationToken = default)
{ {
if (await _scheduler.CheckExists(quartzWork.JobDetail.Key)) if (await _scheduler.CheckExists(quartzWork.JobDetail.Key, cancellationToken))
{ {
await _scheduler.AddJob(quartzWork.JobDetail, true, true); await _scheduler.AddJob(quartzWork.JobDetail, true, true, cancellationToken);
await _scheduler.ResumeJob(quartzWork.JobDetail.Key); await _scheduler.ResumeJob(quartzWork.JobDetail.Key, cancellationToken);
await _scheduler.RescheduleJob(quartzWork.Trigger.Key, quartzWork.Trigger); await _scheduler.RescheduleJob(quartzWork.Trigger.Key, quartzWork.Trigger, cancellationToken);
} }
else else
{ {
await _scheduler.ScheduleJob(quartzWork.JobDetail, quartzWork.Trigger); await _scheduler.ScheduleJob(quartzWork.JobDetail, quartzWork.Trigger, cancellationToken);
} }
} }
} }

4
framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzPeriodicBackgroundWorkerAdapter.cs

@ -67,10 +67,10 @@ public class QuartzPeriodicBackgroundWorkerAdapter<TWorker> : QuartzBackgroundWo
.Build(); .Build();
} }
public override async Task Execute(IJobExecutionContext context) public async override Task Execute(IJobExecutionContext context)
{ {
var worker = (IBackgroundWorker) ServiceProvider.GetService(typeof(TWorker)); var worker = (IBackgroundWorker) ServiceProvider.GetService(typeof(TWorker));
var workerContext = new PeriodicBackgroundWorkerContext(ServiceProvider); var workerContext = new PeriodicBackgroundWorkerContext(ServiceProvider, context.CancellationToken);
switch (worker) switch (worker)
{ {

13
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AsyncPeriodicBackgroundWorkerBase.cs

@ -12,6 +12,7 @@ public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
{ {
protected IServiceScopeFactory ServiceScopeFactory { get; } protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpAsyncTimer Timer { get; } protected AbpAsyncTimer Timer { get; }
protected CancellationToken StartCancellationToken { get; set; }
protected AsyncPeriodicBackgroundWorkerBase( protected AsyncPeriodicBackgroundWorkerBase(
AbpAsyncTimer timer, AbpAsyncTimer timer,
@ -22,13 +23,15 @@ public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
Timer.Elapsed = Timer_Elapsed; Timer.Elapsed = Timer_Elapsed;
} }
public override async Task StartAsync(CancellationToken cancellationToken = default) public async override Task StartAsync(CancellationToken cancellationToken = default)
{ {
StartCancellationToken = cancellationToken;
await base.StartAsync(cancellationToken); await base.StartAsync(cancellationToken);
Timer.Start(cancellationToken); Timer.Start(cancellationToken);
} }
public override async Task StopAsync(CancellationToken cancellationToken = default) public async override Task StopAsync(CancellationToken cancellationToken = default)
{ {
Timer.Stop(cancellationToken); Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken); await base.StopAsync(cancellationToken);
@ -36,16 +39,16 @@ public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
private async Task Timer_Elapsed(AbpAsyncTimer timer) private async Task Timer_Elapsed(AbpAsyncTimer timer)
{ {
await DoWorkAsync(); await DoWorkAsync(StartCancellationToken);
} }
private async Task DoWorkAsync() private async Task DoWorkAsync(CancellationToken cancellationToken = default)
{ {
using (var scope = ServiceScopeFactory.CreateScope()) using (var scope = ServiceScopeFactory.CreateScope())
{ {
try try
{ {
await DoWorkAsync(new PeriodicBackgroundWorkerContext(scope.ServiceProvider)); await DoWorkAsync(new PeriodicBackgroundWorkerContext(scope.ServiceProvider, cancellationToken));
} }
catch (Exception ex) catch (Exception ex)
{ {

4
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs

@ -26,13 +26,13 @@ public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDepen
_backgroundWorkers = new List<IBackgroundWorker>(); _backgroundWorkers = new List<IBackgroundWorker>();
} }
public virtual async Task AddAsync(IBackgroundWorker worker) public virtual async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{ {
_backgroundWorkers.Add(worker); _backgroundWorkers.Add(worker);
if (IsRunning) if (IsRunning)
{ {
await worker.StartAsync(); await worker.StartAsync(cancellationToken);
} }
} }

11
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkersApplicationInitializationContextExtensions.cs

@ -1,4 +1,5 @@
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using JetBrains.Annotations; using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@ -7,17 +8,17 @@ namespace Volo.Abp.BackgroundWorkers;
public static class BackgroundWorkersApplicationInitializationContextExtensions public static class BackgroundWorkersApplicationInitializationContextExtensions
{ {
public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync<TWorker>([NotNull] this ApplicationInitializationContext context) public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync<TWorker>([NotNull] this ApplicationInitializationContext context, CancellationToken cancellationToken = default)
where TWorker : IBackgroundWorker where TWorker : IBackgroundWorker
{ {
Check.NotNull(context, nameof(context)); Check.NotNull(context, nameof(context));
await context.AddBackgroundWorkerAsync(typeof(TWorker)); await context.AddBackgroundWorkerAsync(typeof(TWorker), cancellationToken: cancellationToken);
return context; return context;
} }
public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync([NotNull] this ApplicationInitializationContext context, [NotNull] Type workerType) public async static Task<ApplicationInitializationContext> AddBackgroundWorkerAsync([NotNull] this ApplicationInitializationContext context, [NotNull] Type workerType, CancellationToken cancellationToken = default)
{ {
Check.NotNull(context, nameof(context)); Check.NotNull(context, nameof(context));
Check.NotNull(workerType, nameof(workerType)); Check.NotNull(workerType, nameof(workerType));
@ -29,9 +30,7 @@ public static class BackgroundWorkersApplicationInitializationContextExtensions
await context.ServiceProvider await context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>() .GetRequiredService<IBackgroundWorkerManager>()
.AddAsync( .AddAsync((IBackgroundWorker)context.ServiceProvider.GetRequiredService(workerType), cancellationToken);
(IBackgroundWorker)context.ServiceProvider.GetRequiredService(workerType)
);
return context; return context;
} }

6
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/IBackgroundWorkerManager.cs

@ -1,4 +1,5 @@
using System.Threading.Tasks; using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Threading; using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers; namespace Volo.Abp.BackgroundWorkers;
@ -14,5 +15,6 @@ public interface IBackgroundWorkerManager : IRunnable
/// <param name="worker"> /// <param name="worker">
/// The worker. It should be resolved from IOC. /// The worker. It should be resolved from IOC.
/// </param> /// </param>
Task AddAsync(IBackgroundWorker worker); /// <param name="cancellationToken"></param>
Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default);
} }

10
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerContext.cs

@ -1,4 +1,5 @@
using System; using System;
using System.Threading;
namespace Volo.Abp.BackgroundWorkers; namespace Volo.Abp.BackgroundWorkers;
@ -6,8 +7,17 @@ public class PeriodicBackgroundWorkerContext
{ {
public IServiceProvider ServiceProvider { get; } public IServiceProvider ServiceProvider { get; }
public CancellationToken CancellationToken { get; }
public PeriodicBackgroundWorkerContext(IServiceProvider serviceProvider) public PeriodicBackgroundWorkerContext(IServiceProvider serviceProvider)
{ {
ServiceProvider = serviceProvider; ServiceProvider = serviceProvider;
CancellationToken = default;
}
public PeriodicBackgroundWorkerContext(IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
ServiceProvider = serviceProvider;
CancellationToken = cancellationToken;
} }
} }

Loading…
Cancel
Save