@ -5,6 +5,7 @@ using System.Threading;
using System.Threading.Tasks ;
using System.Threading.Tasks ;
using Hangfire ;
using Hangfire ;
using Hangfire.Common ;
using Hangfire.Common ;
using Hangfire.Storage ;
using Microsoft.Extensions.DependencyInjection ;
using Microsoft.Extensions.DependencyInjection ;
using Microsoft.Extensions.Logging ;
using Microsoft.Extensions.Logging ;
using Microsoft.Extensions.Options ;
using Microsoft.Extensions.Options ;
@ -33,6 +34,7 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
public override async Task AddAsync ( IBackgroundWorker worker , CancellationToken cancellationToken = default )
public override async Task AddAsync ( IBackgroundWorker worker , CancellationToken cancellationToken = default )
{
{
var logger = ServiceProvider . GetRequiredService < ILogger < HangfireBackgroundWorkerManager > > ( ) ;
var abpHangfireOptions = ServiceProvider . GetRequiredService < IOptions < AbpHangfireOptions > > ( ) . Value ;
var abpHangfireOptions = ServiceProvider . GetRequiredService < IOptions < AbpHangfireOptions > > ( ) . Value ;
var defaultQueuePrefix = abpHangfireOptions . DefaultQueuePrefix ;
var defaultQueuePrefix = abpHangfireOptions . DefaultQueuePrefix ;
var defaultQueue = abpHangfireOptions . DefaultQueue ;
var defaultQueue = abpHangfireOptions . DefaultQueue ;
@ -43,15 +45,31 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
{
{
var unProxyWorker = ProxyHelper . UnProxy ( hangfireBackgroundWorker ) ;
var unProxyWorker = ProxyHelper . UnProxy ( hangfireBackgroundWorker ) ;
RecurringJob . AddOrUpdate (
var queueName = hangfireBackgroundWorker . Queue . IsNullOrWhiteSpace ( ) ? defaultQueue : defaultQueuePrefix + hangfireBackgroundWorker . Queue ;
hangfireBackgroundWorker . RecurringJobId ,
if ( ! JobStorage . Current . HasFeature ( JobStorageFeatures . JobQueueProperty ) )
hangfireBackgroundWorker . Queue . IsNullOrWhiteSpace ( ) ? defaultQueue : defaultQueuePrefix + hangfireBackgroundWorker . Queue ,
{
( ) = > ( ( IHangfireBackgroundWorker ) unProxyWorker ) . DoWorkAsync ( cancellationToken ) ,
logger . LogError ( $"Current storage doesn't support specifying queues({queueName}) directly for a specific job. Please use the QueueAttribute instead." ) ;
hangfireBackgroundWorker . CronExpression ,
RecurringJob . AddOrUpdate (
new RecurringJobOptions
hangfireBackgroundWorker . RecurringJobId ,
{
( ) = > ( ( IHangfireBackgroundWorker ) unProxyWorker ) . DoWorkAsync ( cancellationToken ) ,
TimeZone = hangfireBackgroundWorker . TimeZone
hangfireBackgroundWorker . CronExpression ,
} ) ;
new RecurringJobOptions
{
TimeZone = hangfireBackgroundWorker . TimeZone
} ) ;
}
else
{
RecurringJob . AddOrUpdate (
hangfireBackgroundWorker . RecurringJobId ,
queueName ,
( ) = > ( ( IHangfireBackgroundWorker ) unProxyWorker ) . DoWorkAsync ( cancellationToken ) ,
hangfireBackgroundWorker . CronExpression ,
new RecurringJobOptions
{
TimeZone = hangfireBackgroundWorker . TimeZone
} ) ;
}
break ;
break ;
}
}
@ -63,18 +81,17 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
switch ( worker )
switch ( worker )
{
{
case AsyncPeriodicBackgroundWorkerBase asyncPeriodicBackgroundWorkerBase :
case AsyncPeriodicBackgroundWorkerBase asyncPeriodicBackgroundWorkerBase :
period = asyncPeriodicBackgroundWorkerBase . Period ;
period = asyncPeriodicBackgroundWorkerBase . Period ;
cronExpression = asyncPeriodicBackgroundWorkerBase . CronExpression ;
cronExpression = asyncPeriodicBackgroundWorkerBase . CronExpression ;
break ;
break ;
case PeriodicBackgroundWorkerBase periodicBackgroundWorkerBase :
case PeriodicBackgroundWorkerBase periodicBackgroundWorkerBase :
period = periodicBackgroundWorkerBase . Period ;
period = periodicBackgroundWorkerBase . Period ;
cronExpression = periodicBackgroundWorkerBase . CronExpression ;
cronExpression = periodicBackgroundWorkerBase . CronExpression ;
break ;
break ;
}
}
if ( period = = null & & cronExpression . IsNullOrWhiteSpace ( ) )
if ( period = = null & & cronExpression . IsNullOrWhiteSpace ( ) )
{
{
var logger = ServiceProvider . GetRequiredService < ILogger < HangfireBackgroundWorkerManager > > ( ) ;
logger . LogError (
logger . LogError (
$"Cannot add periodic background worker {worker.GetType().FullName} to Hangfire scheduler, because both Period and CronExpression are not set. " +
$"Cannot add periodic background worker {worker.GetType().FullName} to Hangfire scheduler, because both Period and CronExpression are not set. " +
"You can either set Period or CronExpression property of the worker."
"You can either set Period or CronExpression property of the worker."
@ -86,15 +103,32 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
Expression < Func < Task > > methodCall = ( ) = > workerAdapter . DoWorkAsync ( cancellationToken ) ;
Expression < Func < Task > > methodCall = ( ) = > workerAdapter . DoWorkAsync ( cancellationToken ) ;
var recurringJobId = ! workerAdapter . RecurringJobId . IsNullOrWhiteSpace ( ) ? workerAdapter . RecurringJobId : GetRecurringJobId ( worker , methodCall ) ;
var recurringJobId = ! workerAdapter . RecurringJobId . IsNullOrWhiteSpace ( ) ? workerAdapter . RecurringJobId : GetRecurringJobId ( worker , methodCall ) ;
RecurringJob . AddOrUpdate (
var queueName = workerAdapter . Queue . IsNullOrWhiteSpace ( ) ? defaultQueue : defaultQueuePrefix + workerAdapter . Queue ;
recurringJobId ,
if ( ! JobStorage . Current . HasFeature ( JobStorageFeatures . JobQueueProperty ) )
workerAdapter . Queue . IsNullOrWhiteSpace ( ) ? defaultQueue : defaultQueuePrefix + workerAdapter . Queue ,
{
methodCall ,
logger . LogError ( $"Current storage doesn't support specifying queues({queueName}) directly for a specific job. Please use the QueueAttribute instead." ) ;
cronExpression ? ? GetCron ( period ! . Value ) ,
RecurringJob . AddOrUpdate (
new RecurringJobOptions
recurringJobId ,
{
methodCall ,
TimeZone = workerAdapter . TimeZone
cronExpression ? ? GetCron ( period ! . Value ) ,
} ) ;
new RecurringJobOptions
{
TimeZone = workerAdapter . TimeZone
} ) ;
}
else
{
RecurringJob . AddOrUpdate (
recurringJobId ,
queueName ,
methodCall ,
cronExpression ? ? GetCron ( period ! . Value ) ,
new RecurringJobOptions
{
TimeZone = workerAdapter . TimeZone
} ) ;
}
break ;
break ;
}
}
default :
default :