Browse Source

Handle Hangfire storage without JobQueueProperty feature.

pull/24682/head
maliming 3 weeks ago
parent
commit
5c41632d84
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 80
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs

80
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs

@ -5,6 +5,7 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Hangfire; using Hangfire;
using Hangfire.Common; using Hangfire.Common;
using Hangfire.Storage;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@ -33,6 +34,7 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
public override async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default) public override async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{ {
var logger = ServiceProvider.GetRequiredService<ILogger<HangfireBackgroundWorkerManager>>();
var abpHangfireOptions = ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value; var abpHangfireOptions = ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
var defaultQueuePrefix = abpHangfireOptions.DefaultQueuePrefix; var defaultQueuePrefix = abpHangfireOptions.DefaultQueuePrefix;
var defaultQueue = abpHangfireOptions.DefaultQueue; var defaultQueue = abpHangfireOptions.DefaultQueue;
@ -43,15 +45,31 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
{ {
var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker); var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker);
RecurringJob.AddOrUpdate( var queueName = hangfireBackgroundWorker.Queue.IsNullOrWhiteSpace() ? defaultQueue : defaultQueuePrefix + hangfireBackgroundWorker.Queue;
hangfireBackgroundWorker.RecurringJobId, if (!JobStorage.Current.HasFeature(JobStorageFeatures.JobQueueProperty))
hangfireBackgroundWorker.Queue.IsNullOrWhiteSpace() ? defaultQueue : defaultQueuePrefix + hangfireBackgroundWorker.Queue, {
() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken), logger.LogError($"Current storage doesn't support specifying queues({queueName}) directly for a specific job. Please use the QueueAttribute instead.");
hangfireBackgroundWorker.CronExpression, RecurringJob.AddOrUpdate(
new RecurringJobOptions hangfireBackgroundWorker.RecurringJobId,
{ () => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
TimeZone = hangfireBackgroundWorker.TimeZone hangfireBackgroundWorker.CronExpression,
}); new RecurringJobOptions
{
TimeZone = hangfireBackgroundWorker.TimeZone
});
}
else
{
RecurringJob.AddOrUpdate(
hangfireBackgroundWorker.RecurringJobId,
queueName,
() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression,
new RecurringJobOptions
{
TimeZone = hangfireBackgroundWorker.TimeZone
});
}
break; break;
} }
@ -63,18 +81,17 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
switch (worker) switch (worker)
{ {
case AsyncPeriodicBackgroundWorkerBase asyncPeriodicBackgroundWorkerBase: case AsyncPeriodicBackgroundWorkerBase asyncPeriodicBackgroundWorkerBase:
period = asyncPeriodicBackgroundWorkerBase.Period; period = asyncPeriodicBackgroundWorkerBase.Period;
cronExpression = asyncPeriodicBackgroundWorkerBase.CronExpression; cronExpression = asyncPeriodicBackgroundWorkerBase.CronExpression;
break; break;
case PeriodicBackgroundWorkerBase periodicBackgroundWorkerBase: case PeriodicBackgroundWorkerBase periodicBackgroundWorkerBase:
period = periodicBackgroundWorkerBase.Period; period = periodicBackgroundWorkerBase.Period;
cronExpression = periodicBackgroundWorkerBase.CronExpression; cronExpression = periodicBackgroundWorkerBase.CronExpression;
break; break;
} }
if (period == null && cronExpression.IsNullOrWhiteSpace()) if (period == null && cronExpression.IsNullOrWhiteSpace())
{ {
var logger = ServiceProvider.GetRequiredService<ILogger<HangfireBackgroundWorkerManager>>();
logger.LogError( logger.LogError(
$"Cannot add periodic background worker {worker.GetType().FullName} to Hangfire scheduler, because both Period and CronExpression are not set. " + $"Cannot add periodic background worker {worker.GetType().FullName} to Hangfire scheduler, because both Period and CronExpression are not set. " +
"You can either set Period or CronExpression property of the worker." "You can either set Period or CronExpression property of the worker."
@ -86,15 +103,32 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
Expression<Func<Task>> methodCall = () => workerAdapter.DoWorkAsync(cancellationToken); Expression<Func<Task>> methodCall = () => workerAdapter.DoWorkAsync(cancellationToken);
var recurringJobId = !workerAdapter.RecurringJobId.IsNullOrWhiteSpace() ? workerAdapter.RecurringJobId : GetRecurringJobId(worker, methodCall); var recurringJobId = !workerAdapter.RecurringJobId.IsNullOrWhiteSpace() ? workerAdapter.RecurringJobId : GetRecurringJobId(worker, methodCall);
RecurringJob.AddOrUpdate( var queueName = workerAdapter.Queue.IsNullOrWhiteSpace() ? defaultQueue : defaultQueuePrefix + workerAdapter.Queue;
recurringJobId, if (!JobStorage.Current.HasFeature(JobStorageFeatures.JobQueueProperty))
workerAdapter.Queue.IsNullOrWhiteSpace() ? defaultQueue : defaultQueuePrefix + workerAdapter.Queue, {
methodCall, logger.LogError($"Current storage doesn't support specifying queues({queueName}) directly for a specific job. Please use the QueueAttribute instead.");
cronExpression ?? GetCron(period!.Value), RecurringJob.AddOrUpdate(
new RecurringJobOptions recurringJobId,
{ methodCall,
TimeZone = workerAdapter.TimeZone cronExpression ?? GetCron(period!.Value),
}); new RecurringJobOptions
{
TimeZone = workerAdapter.TimeZone
});
}
else
{
RecurringJob.AddOrUpdate(
recurringJobId,
queueName,
methodCall,
cronExpression ?? GetCron(period!.Value),
new RecurringJobOptions
{
TimeZone = workerAdapter.TimeZone
});
}
break; break;
} }
default: default:

Loading…
Cancel
Save