Browse Source

Merge pull request #23071 from abpframework/CornExpression

Add support for `Cron` expressions in background workers and refactor related classes
pull/23079/head
Enis Necipoglu 8 months ago
committed by GitHub
parent
commit
036f2edabe
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      docs/en/framework/infrastructure/background-workers/index.md
  2. 74
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs
  3. 46
      framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzPeriodicBackgroundWorkerAdapter.cs
  4. 5
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AsyncPeriodicBackgroundWorkerBase.cs
  5. 9
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs

3
docs/en/framework/infrastructure/background-workers/index.md

@ -41,6 +41,8 @@ Start your worker in the `StartAsync` (which is called when the application begi
Assume that we want to make a user passive, if the user has not logged in to the application in last 30 days. `AsyncPeriodicBackgroundWorkerBase` class simplifies to create periodic workers, so we will use it for the example below:
> You can use `CronExpression` property to set the cron expression for the background worker if you will use the [Hangfire Background Worker Manager](./hangfire.md) or [Quartz Background Worker Manager](./quartz.md).
````csharp
public class PassiveUserCheckerWorker : AsyncPeriodicBackgroundWorkerBase
{
@ -52,6 +54,7 @@ public class PassiveUserCheckerWorker : AsyncPeriodicBackgroundWorkerBase
serviceScopeFactory)
{
Timer.Period = 600000; //10 minutes
//CronExpression = "0 0/10 * * * ?"; //Run every 10 minutes, Only for Quartz or Hangfire integration.
}
protected async override Task DoWorkAsync(

74
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs

@ -1,14 +1,15 @@
using System;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using Hangfire.Common;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy;
using Volo.Abp.Hangfire;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers.Hangfire;
@ -55,40 +56,40 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
}
case AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase:
{
var timer = worker.GetType().GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker);
var period = worker is AsyncPeriodicBackgroundWorkerBase ? ((AbpAsyncTimer?)timer)?.Period : ((AbpTimer?)timer)?.Period;
int? period = null;
string? CronExpression = null;
if (period == null)
if (worker is AsyncPeriodicBackgroundWorkerBase asyncPeriodicBackgroundWorkerBase)
{
return;
period = asyncPeriodicBackgroundWorkerBase.Period;
CronExpression = asyncPeriodicBackgroundWorkerBase.CronExpression;
}
var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
var workerAdapter = (Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker)!;
if (workerAdapter.RecurringJobId.IsNullOrWhiteSpace())
else if (worker is PeriodicBackgroundWorkerBase periodicBackgroundWorkerBase)
{
RecurringJob.AddOrUpdate(
() => workerAdapter.DoWorkAsync(cancellationToken),
GetCron(period.Value),
workerAdapter.TimeZone ,
workerAdapter.Queue.IsNullOrWhiteSpace() ? defaultQueue : defaultQueuePrefix + workerAdapter.Queue);
period = periodicBackgroundWorkerBase.Period;
CronExpression = periodicBackgroundWorkerBase.CronExpression;
}
else
{
RecurringJob.AddOrUpdate(
workerAdapter.RecurringJobId,
workerAdapter.Queue.IsNullOrWhiteSpace() ? defaultQueue : defaultQueuePrefix + workerAdapter.Queue,
() => workerAdapter.DoWorkAsync(cancellationToken),
GetCron(period.Value),
new RecurringJobOptions
{
TimeZone = workerAdapter.TimeZone
});
if (period == null && CronExpression.IsNullOrWhiteSpace())
{
return;
}
var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
var workerAdapter = (Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker)!;
Expression<Func<Task>> methodCall = () => workerAdapter.DoWorkAsync(cancellationToken);
var recurringJobId = !workerAdapter.RecurringJobId.IsNullOrWhiteSpace() ? workerAdapter.RecurringJobId : GetRecurringJobId(worker, methodCall);
RecurringJob.AddOrUpdate(
recurringJobId,
workerAdapter.Queue.IsNullOrWhiteSpace() ? defaultQueue : defaultQueuePrefix + workerAdapter.Queue,
methodCall,
CronExpression ?? GetCron(period!.Value),
new RecurringJobOptions
{
TimeZone = workerAdapter.TimeZone
});
break;
}
default:
@ -97,6 +98,24 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
}
}
private readonly static MethodInfo? GetRecurringJobIdMethodInfo = typeof(RecurringJob).GetMethod("GetRecurringJobId", BindingFlags.NonPublic | BindingFlags.Static);
protected virtual string? GetRecurringJobId(IBackgroundWorker worker, Expression<Func<Task>> methodCall)
{
string? recurringJobId = null;
if (GetRecurringJobIdMethodInfo != null)
{
var job = Job.FromExpression(methodCall);
recurringJobId = (string)GetRecurringJobIdMethodInfo.Invoke(null, [job])!;
}
if (recurringJobId.IsNullOrWhiteSpace())
{
recurringJobId = $"HangfirePeriodicBackgroundWorkerAdapter<{worker.GetType().Name}>.DoWorkAsync";
}
return recurringJobId;
}
protected virtual string GetCron(int period)
{
var time = TimeSpan.FromMilliseconds(period);
@ -120,8 +139,7 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
}
else
{
throw new AbpException(
$"Cannot convert period: {period} to cron expression, use HangfireBackgroundWorkerBase to define worker");
throw new AbpException($"Cannot convert period: {period} to cron expression, use HangfireBackgroundWorkerBase to define worker");
}
return cron;

46
framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzPeriodicBackgroundWorkerAdapter.cs

@ -3,8 +3,6 @@ using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Quartz;
using Volo.Abp.DynamicProxy;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers.Quartz;
@ -25,28 +23,23 @@ public class QuartzPeriodicBackgroundWorkerAdapter<TWorker> : QuartzBackgroundWo
}
public void BuildWorker(IBackgroundWorker worker)
public virtual void BuildWorker(IBackgroundWorker worker)
{
int? period;
var workerType = ProxyHelper.GetUnProxiedType(worker);
int? period = null;
string? CronExpression = null;
if (worker is AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase)
if (worker is AsyncPeriodicBackgroundWorkerBase asyncPeriodicBackgroundWorkerBase)
{
if (typeof(TWorker) != workerType)
{
throw new ArgumentException($"{nameof(worker)} type is different from the generic type");
}
var timer = workerType.GetProperty("Timer", BindingFlags.Instance | BindingFlags.NonPublic)?.GetValue(worker);
period = worker is AsyncPeriodicBackgroundWorkerBase ? ((AbpAsyncTimer?)timer)?.Period : ((AbpTimer?)timer)?.Period;
period = asyncPeriodicBackgroundWorkerBase.Period;
CronExpression = asyncPeriodicBackgroundWorkerBase.CronExpression;
}
else
else if (worker is PeriodicBackgroundWorkerBase periodicBackgroundWorkerBase)
{
return;
period = periodicBackgroundWorkerBase.Period;
CronExpression = periodicBackgroundWorkerBase.CronExpression;
}
if (period == null)
if (period == null && CronExpression.IsNullOrWhiteSpace())
{
return;
}
@ -55,10 +48,21 @@ public class QuartzPeriodicBackgroundWorkerAdapter<TWorker> : QuartzBackgroundWo
.Create<QuartzPeriodicBackgroundWorkerAdapter<TWorker>>()
.WithIdentity(BackgroundWorkerNameAttribute.GetName<TWorker>())
.Build();
Trigger = TriggerBuilder.Create()
.WithIdentity(BackgroundWorkerNameAttribute.GetName<TWorker>())
.WithSimpleSchedule(builder => builder.WithInterval(TimeSpan.FromMilliseconds(period.Value)).RepeatForever())
.Build();
var triggerBuilder = TriggerBuilder.Create()
.ForJob(JobDetail)
.WithIdentity(BackgroundWorkerNameAttribute.GetName<TWorker>());
if (!CronExpression.IsNullOrWhiteSpace())
{
triggerBuilder.WithCronSchedule(CronExpression);
}
else
{
triggerBuilder.WithSimpleSchedule(builder => builder.WithInterval(TimeSpan.FromMilliseconds(period!.Value)).RepeatForever());
}
Trigger = triggerBuilder.Build();
}
public async override Task Execute(IJobExecutionContext context)

5
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AsyncPeriodicBackgroundWorkerBase.cs

@ -13,6 +13,11 @@ public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpAsyncTimer Timer { get; }
protected CancellationToken StartCancellationToken { get; set; }
public int Period => Timer.Period;
/// <summary>
/// CronExpression has high priority over Period.
/// </summary>
public string? CronExpression { get; protected set; }
protected AsyncPeriodicBackgroundWorkerBase(
AbpAsyncTimer timer,

9
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs

@ -15,6 +15,11 @@ public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpTimer Timer { get; }
public int Period => Timer.Period;
/// <summary>
/// CronExpression has high priority over Period.
/// </summary>
public string? CronExpression { get; protected set; }
protected PeriodicBackgroundWorkerBase(
AbpTimer timer,
@ -25,13 +30,13 @@ public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
Timer.Elapsed += Timer_Elapsed;
}
public override async Task StartAsync(CancellationToken cancellationToken = default)
public async override Task StartAsync(CancellationToken cancellationToken = default)
{
await base.StartAsync(cancellationToken);
Timer.Start(cancellationToken);
}
public override async Task StopAsync(CancellationToken cancellationToken = default)
public async override Task StopAsync(CancellationToken cancellationToken = default)
{
Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken);

Loading…
Cancel
Save