mirror of https://github.com/abpframework/abp.git
24 changed files with 889 additions and 631 deletions
@ -1,31 +1,48 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.ExceptionHandling; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers.Hangfire; |
|||
|
|||
public class HangfireDynamicBackgroundWorkerAdapter : ITransientDependency |
|||
{ |
|||
protected IDynamicBackgroundWorkerHandlerRegistry DynamicBackgroundWorkerHandlerRegistry { get; } |
|||
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; } |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
public ILogger<HangfireDynamicBackgroundWorkerAdapter> Logger { get; set; } |
|||
|
|||
public HangfireDynamicBackgroundWorkerAdapter( |
|||
IDynamicBackgroundWorkerHandlerRegistry dynamicBackgroundWorkerHandlerRegistry, |
|||
IDynamicBackgroundWorkerHandlerRegistry handlerRegistry, |
|||
IServiceProvider serviceProvider) |
|||
{ |
|||
DynamicBackgroundWorkerHandlerRegistry = dynamicBackgroundWorkerHandlerRegistry; |
|||
HandlerRegistry = handlerRegistry; |
|||
ServiceProvider = serviceProvider; |
|||
Logger = NullLogger<HangfireDynamicBackgroundWorkerAdapter>.Instance; |
|||
} |
|||
|
|||
public virtual async Task DoWorkAsync(string workerName, CancellationToken cancellationToken = default) |
|||
{ |
|||
var handler = DynamicBackgroundWorkerHandlerRegistry.Get(workerName); |
|||
var handler = HandlerRegistry.Get(workerName); |
|||
if (handler == null) |
|||
{ |
|||
Logger.LogWarning("No handler registered for dynamic worker: {WorkerName}", workerName); |
|||
return; |
|||
} |
|||
|
|||
await handler(new DynamicBackgroundWorkerExecutionContext(workerName, ServiceProvider), cancellationToken); |
|||
try |
|||
{ |
|||
await handler(new DynamicBackgroundWorkerExecutionContext(workerName, ServiceProvider), cancellationToken); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
await ServiceProvider.GetRequiredService<IExceptionNotifier>() |
|||
.NotifyAsync(new ExceptionNotificationContext(ex)); |
|||
|
|||
throw; |
|||
} |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,168 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Hangfire; |
|||
using Hangfire.Storage; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Hangfire; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers.Hangfire; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class HangfireDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerManager, ISingletonDependency |
|||
{ |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; } |
|||
public ILogger<HangfireDynamicBackgroundWorkerManager> Logger { get; set; } |
|||
|
|||
public HangfireDynamicBackgroundWorkerManager( |
|||
IServiceProvider serviceProvider, |
|||
IDynamicBackgroundWorkerHandlerRegistry handlerRegistry) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
HandlerRegistry = handlerRegistry; |
|||
Logger = NullLogger<HangfireDynamicBackgroundWorkerManager>.Instance; |
|||
} |
|||
|
|||
public virtual Task AddAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
Check.NotNull(handler, nameof(handler)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
var cronExpression = schedule.CronExpression; |
|||
if (cronExpression.IsNullOrWhiteSpace()) |
|||
{ |
|||
var period = schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod; |
|||
cronExpression = GetCron(period); |
|||
} |
|||
|
|||
ScheduleRecurringJob(workerName, cronExpression, cancellationToken); |
|||
HandlerRegistry.Register(workerName, handler); |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
public virtual Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
|
|||
if (!HandlerRegistry.IsRegistered(workerName)) |
|||
{ |
|||
return Task.FromResult(false); |
|||
} |
|||
|
|||
var recurringJobId = $"DynamicWorker:{workerName}"; |
|||
RecurringJob.RemoveIfExists(recurringJobId); |
|||
HandlerRegistry.Unregister(workerName); |
|||
|
|||
return Task.FromResult(true); |
|||
} |
|||
|
|||
public virtual Task<bool> UpdateScheduleAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
if (!HandlerRegistry.IsRegistered(workerName)) |
|||
{ |
|||
return Task.FromResult(false); |
|||
} |
|||
|
|||
var cronExpression = schedule.CronExpression; |
|||
if (cronExpression.IsNullOrWhiteSpace()) |
|||
{ |
|||
var period = schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod; |
|||
cronExpression = GetCron(period); |
|||
} |
|||
|
|||
ScheduleRecurringJob(workerName, cronExpression, cancellationToken); |
|||
|
|||
return Task.FromResult(true); |
|||
} |
|||
|
|||
public virtual bool IsRegistered(string workerName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
return HandlerRegistry.IsRegistered(workerName); |
|||
} |
|||
|
|||
protected virtual void ScheduleRecurringJob(string workerName, string cronExpression, CancellationToken cancellationToken) |
|||
{ |
|||
var abpHangfireOptions = ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value; |
|||
var queueName = abpHangfireOptions.DefaultQueue; |
|||
var recurringJobId = $"DynamicWorker:{workerName}"; |
|||
|
|||
if (!JobStorage.Current.HasFeature(JobStorageFeatures.JobQueueProperty)) |
|||
{ |
|||
Logger.LogWarning( |
|||
"Current storage doesn't support specifying queues ({QueueName}) directly for a specific job. Please use the QueueAttribute instead.", |
|||
queueName); |
|||
|
|||
RecurringJob.AddOrUpdate<HangfireDynamicBackgroundWorkerAdapter>( |
|||
recurringJobId, |
|||
adapter => adapter.DoWorkAsync(workerName, cancellationToken), |
|||
cronExpression, |
|||
new RecurringJobOptions |
|||
{ |
|||
TimeZone = TimeZoneInfo.Utc |
|||
}); |
|||
} |
|||
else |
|||
{ |
|||
RecurringJob.AddOrUpdate<HangfireDynamicBackgroundWorkerAdapter>( |
|||
recurringJobId, |
|||
queueName, |
|||
adapter => adapter.DoWorkAsync(workerName, cancellationToken), |
|||
cronExpression, |
|||
new RecurringJobOptions |
|||
{ |
|||
TimeZone = TimeZoneInfo.Utc |
|||
}); |
|||
} |
|||
} |
|||
|
|||
protected virtual string GetCron(int period) |
|||
{ |
|||
var time = TimeSpan.FromMilliseconds(period); |
|||
string cron; |
|||
|
|||
if (time.TotalSeconds <= 59) |
|||
{ |
|||
cron = $"*/{time.TotalSeconds} * * * * *"; |
|||
} |
|||
else if (time.TotalMinutes <= 59) |
|||
{ |
|||
cron = $"*/{time.TotalMinutes} * * * *"; |
|||
} |
|||
else if (time.TotalHours <= 23) |
|||
{ |
|||
cron = $"0 */{time.TotalHours} * * *"; |
|||
} |
|||
else if (time.TotalDays <= 31) |
|||
{ |
|||
cron = $"0 0 0 1/{time.TotalDays} * *"; |
|||
} |
|||
else |
|||
{ |
|||
throw new AbpException($"Cannot convert period: {period} to cron expression."); |
|||
} |
|||
|
|||
return cron; |
|||
} |
|||
} |
|||
@ -1,41 +1,56 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Quartz; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.ExceptionHandling; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers.Quartz; |
|||
|
|||
public class QuartzDynamicBackgroundWorkerAdapter : IJob, ITransientDependency |
|||
{ |
|||
protected IDynamicBackgroundWorkerHandlerRegistry DynamicBackgroundWorkerHandlerRegistry { get; } |
|||
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; } |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
public ILogger<QuartzDynamicBackgroundWorkerAdapter> Logger { get; set; } |
|||
|
|||
public QuartzDynamicBackgroundWorkerAdapter( |
|||
IDynamicBackgroundWorkerHandlerRegistry dynamicBackgroundWorkerHandlerRegistry, |
|||
IDynamicBackgroundWorkerHandlerRegistry handlerRegistry, |
|||
IServiceProvider serviceProvider) |
|||
{ |
|||
DynamicBackgroundWorkerHandlerRegistry = dynamicBackgroundWorkerHandlerRegistry; |
|||
HandlerRegistry = handlerRegistry; |
|||
ServiceProvider = serviceProvider; |
|||
Logger = NullLogger<QuartzDynamicBackgroundWorkerAdapter>.Instance; |
|||
} |
|||
|
|||
public virtual async Task Execute(IJobExecutionContext context) |
|||
{ |
|||
var workerName = context.MergedJobDataMap.GetString(QuartzBackgroundWorkerManager.DynamicWorkerNameKey); |
|||
var workerName = context.MergedJobDataMap.GetString(QuartzDynamicBackgroundWorkerManager.DynamicWorkerNameKey); |
|||
if (string.IsNullOrWhiteSpace(workerName)) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
var nonNullWorkerName = workerName!; |
|||
var handler = DynamicBackgroundWorkerHandlerRegistry.Get(nonNullWorkerName); |
|||
var handler = HandlerRegistry.Get(workerName!); |
|||
if (handler == null) |
|||
{ |
|||
Logger.LogWarning("No handler registered for dynamic worker: {WorkerName}", workerName); |
|||
return; |
|||
} |
|||
|
|||
await handler( |
|||
new DynamicBackgroundWorkerExecutionContext(nonNullWorkerName, ServiceProvider), |
|||
context.CancellationToken |
|||
); |
|||
try |
|||
{ |
|||
await handler( |
|||
new DynamicBackgroundWorkerExecutionContext(workerName!, ServiceProvider), |
|||
context.CancellationToken); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
await ServiceProvider.GetRequiredService<IExceptionNotifier>() |
|||
.NotifyAsync(new ExceptionNotificationContext(ex)); |
|||
|
|||
throw; |
|||
} |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,143 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Quartz; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers.Quartz; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class QuartzDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerManager, ISingletonDependency |
|||
{ |
|||
public const string DynamicWorkerNameKey = "AbpDynamicWorkerName"; |
|||
|
|||
protected IScheduler Scheduler { get; } |
|||
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; } |
|||
public ILogger<QuartzDynamicBackgroundWorkerManager> Logger { get; set; } |
|||
|
|||
public QuartzDynamicBackgroundWorkerManager( |
|||
IScheduler scheduler, |
|||
IDynamicBackgroundWorkerHandlerRegistry handlerRegistry) |
|||
{ |
|||
Scheduler = scheduler; |
|||
HandlerRegistry = handlerRegistry; |
|||
Logger = NullLogger<QuartzDynamicBackgroundWorkerManager>.Instance; |
|||
} |
|||
|
|||
public virtual async Task AddAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
Check.NotNull(handler, nameof(handler)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
var jobKey = new JobKey($"DynamicWorker:{workerName}"); |
|||
var triggerKey = new TriggerKey($"DynamicWorker:{workerName}"); |
|||
var jobDetail = JobBuilder.Create<QuartzDynamicBackgroundWorkerAdapter>() |
|||
.WithIdentity(jobKey) |
|||
.UsingJobData(DynamicWorkerNameKey, workerName) |
|||
.Build(); |
|||
|
|||
var trigger = BuildTrigger(schedule, jobDetail, triggerKey); |
|||
|
|||
if (await Scheduler.CheckExists(jobDetail.Key, cancellationToken)) |
|||
{ |
|||
await Scheduler.AddJob(jobDetail, true, true, cancellationToken); |
|||
await Scheduler.ResumeJob(jobDetail.Key, cancellationToken); |
|||
await Scheduler.RescheduleJob(trigger.Key, trigger, cancellationToken); |
|||
} |
|||
else |
|||
{ |
|||
await Scheduler.ScheduleJob(jobDetail, trigger, cancellationToken); |
|||
} |
|||
|
|||
HandlerRegistry.Register(workerName, handler); |
|||
} |
|||
|
|||
public virtual async Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
|
|||
if (!HandlerRegistry.IsRegistered(workerName)) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
var jobKey = new JobKey($"DynamicWorker:{workerName}"); |
|||
var deleted = await Scheduler.DeleteJob(jobKey, cancellationToken); |
|||
if (deleted) |
|||
{ |
|||
HandlerRegistry.Unregister(workerName); |
|||
} |
|||
|
|||
return deleted; |
|||
} |
|||
|
|||
public virtual async Task<bool> UpdateScheduleAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
if (!HandlerRegistry.IsRegistered(workerName)) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
var triggerKey = new TriggerKey($"DynamicWorker:{workerName}"); |
|||
var jobKey = new JobKey($"DynamicWorker:{workerName}"); |
|||
|
|||
var triggerBuilder = TriggerBuilder.Create() |
|||
.WithIdentity(triggerKey) |
|||
.ForJob(jobKey); |
|||
|
|||
if (!schedule.CronExpression.IsNullOrWhiteSpace()) |
|||
{ |
|||
triggerBuilder.WithCronSchedule(schedule.CronExpression); |
|||
} |
|||
else |
|||
{ |
|||
triggerBuilder.WithSimpleSchedule(builder => |
|||
builder.WithInterval(TimeSpan.FromMilliseconds(schedule.Period!.Value)).RepeatForever()); |
|||
} |
|||
|
|||
var result = await Scheduler.RescheduleJob(triggerKey, triggerBuilder.Build(), cancellationToken); |
|||
return result != null; |
|||
} |
|||
|
|||
public virtual bool IsRegistered(string workerName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
return HandlerRegistry.IsRegistered(workerName); |
|||
} |
|||
|
|||
protected virtual ITrigger BuildTrigger(DynamicBackgroundWorkerSchedule schedule, IJobDetail jobDetail, TriggerKey triggerKey) |
|||
{ |
|||
var triggerBuilder = TriggerBuilder.Create() |
|||
.ForJob(jobDetail) |
|||
.WithIdentity(triggerKey); |
|||
|
|||
if (!schedule.CronExpression.IsNullOrWhiteSpace()) |
|||
{ |
|||
triggerBuilder.WithCronSchedule(schedule.CronExpression); |
|||
} |
|||
else |
|||
{ |
|||
triggerBuilder.WithSimpleSchedule(builder => |
|||
builder.WithInterval(TimeSpan.FromMilliseconds(schedule.Period!.Value)).RepeatForever()); |
|||
} |
|||
|
|||
return triggerBuilder.Build(); |
|||
} |
|||
} |
|||
@ -0,0 +1,164 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using TickerQ.Utilities.Entities; |
|||
using TickerQ.Utilities.Enums; |
|||
using TickerQ.Utilities.Interfaces.Managers; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.TickerQ; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers.TickerQ; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerManager, ISingletonDependency |
|||
{ |
|||
protected AbpTickerQFunctionProvider AbpTickerQFunctionProvider { get; } |
|||
protected AbpTickerQBackgroundWorkersProvider AbpTickerQBackgroundWorkersProvider { get; } |
|||
protected ICronTickerManager<CronTickerEntity> CronTickerManager { get; } |
|||
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; } |
|||
public ILogger<TickerQDynamicBackgroundWorkerManager> Logger { get; set; } |
|||
|
|||
public TickerQDynamicBackgroundWorkerManager( |
|||
AbpTickerQFunctionProvider abpTickerQFunctionProvider, |
|||
AbpTickerQBackgroundWorkersProvider abpTickerQBackgroundWorkersProvider, |
|||
ICronTickerManager<CronTickerEntity> cronTickerManager, |
|||
IDynamicBackgroundWorkerHandlerRegistry handlerRegistry) |
|||
{ |
|||
AbpTickerQFunctionProvider = abpTickerQFunctionProvider; |
|||
AbpTickerQBackgroundWorkersProvider = abpTickerQBackgroundWorkersProvider; |
|||
CronTickerManager = cronTickerManager; |
|||
HandlerRegistry = handlerRegistry; |
|||
Logger = NullLogger<TickerQDynamicBackgroundWorkerManager>.Instance; |
|||
} |
|||
|
|||
public virtual async Task AddAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
Check.NotNull(handler, nameof(handler)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
var cronExpression = schedule.CronExpression ?? GetCron(schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod); |
|||
var functionName = $"DynamicWorker:{workerName}"; |
|||
|
|||
AbpTickerQFunctionProvider.Functions[functionName] = |
|||
(string.Empty, TickerTaskPriority.LongRunning, async (tickerCancellationToken, serviceProvider, _) => |
|||
{ |
|||
var registeredHandler = HandlerRegistry.Get(workerName); |
|||
if (registeredHandler == null) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
await registeredHandler( |
|||
new DynamicBackgroundWorkerExecutionContext(workerName, serviceProvider), |
|||
tickerCancellationToken); |
|||
}, 0); |
|||
|
|||
AbpTickerQBackgroundWorkersProvider.BackgroundWorkers[functionName] = new AbpTickerQCronBackgroundWorker |
|||
{ |
|||
Function = functionName, |
|||
CronExpression = cronExpression, |
|||
WorkerType = typeof(TickerQDynamicBackgroundWorkerManager) |
|||
}; |
|||
|
|||
await CronTickerManager.AddAsync(new CronTickerEntity |
|||
{ |
|||
Function = functionName, |
|||
Expression = cronExpression |
|||
}); |
|||
|
|||
HandlerRegistry.Register(workerName, handler); |
|||
} |
|||
|
|||
public virtual Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
|
|||
if (!HandlerRegistry.IsRegistered(workerName)) |
|||
{ |
|||
return Task.FromResult(false); |
|||
} |
|||
|
|||
var functionName = $"DynamicWorker:{workerName}"; |
|||
AbpTickerQFunctionProvider.Functions.Remove(functionName); |
|||
AbpTickerQBackgroundWorkersProvider.BackgroundWorkers.Remove(functionName); |
|||
HandlerRegistry.Unregister(workerName); |
|||
|
|||
return Task.FromResult(true); |
|||
} |
|||
|
|||
public virtual async Task<bool> UpdateScheduleAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
if (!HandlerRegistry.IsRegistered(workerName)) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
var cronExpression = schedule.CronExpression ?? GetCron(schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod); |
|||
var functionName = $"DynamicWorker:{workerName}"; |
|||
|
|||
if (AbpTickerQBackgroundWorkersProvider.BackgroundWorkers.TryGetValue(functionName, out var existingWorker)) |
|||
{ |
|||
existingWorker.CronExpression = cronExpression; |
|||
} |
|||
|
|||
await CronTickerManager.AddAsync(new CronTickerEntity |
|||
{ |
|||
Function = functionName, |
|||
Expression = cronExpression |
|||
}); |
|||
|
|||
return true; |
|||
} |
|||
|
|||
public virtual bool IsRegistered(string workerName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
return HandlerRegistry.IsRegistered(workerName); |
|||
} |
|||
|
|||
protected virtual string GetCron(int period) |
|||
{ |
|||
var time = TimeSpan.FromMilliseconds(period); |
|||
if (time.TotalMinutes < 1) |
|||
{ |
|||
return "* * * * *"; |
|||
} |
|||
|
|||
if (time.TotalMinutes < 60) |
|||
{ |
|||
var minutes = (int)Math.Round(time.TotalMinutes); |
|||
return $"*/{minutes} * * * *"; |
|||
} |
|||
|
|||
if (time.TotalHours < 24) |
|||
{ |
|||
var hours = (int)Math.Round(time.TotalHours); |
|||
return $"0 */{hours} * * *"; |
|||
} |
|||
|
|||
if (time.TotalDays <= 31) |
|||
{ |
|||
var days = (int)Math.Round(time.TotalDays); |
|||
return $"0 0 */{days} * *"; |
|||
} |
|||
|
|||
throw new AbpException($"Cannot convert period: {period} to cron expression."); |
|||
} |
|||
} |
|||
@ -0,0 +1,144 @@ |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Threading; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerManager, ISingletonDependency, IDisposable |
|||
{ |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
public ILogger<DefaultDynamicBackgroundWorkerManager> Logger { get; set; } |
|||
|
|||
private readonly ConcurrentDictionary<string, InMemoryDynamicBackgroundWorker> _dynamicWorkers; |
|||
private bool _isDisposed; |
|||
|
|||
public DefaultDynamicBackgroundWorkerManager(IServiceProvider serviceProvider) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
Logger = NullLogger<DefaultDynamicBackgroundWorkerManager>.Instance; |
|||
_dynamicWorkers = new ConcurrentDictionary<string, InMemoryDynamicBackgroundWorker>(); |
|||
} |
|||
|
|||
public virtual async Task AddAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
Check.NotNull(handler, nameof(handler)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
if (schedule.Period == null && !string.IsNullOrWhiteSpace(schedule.CronExpression)) |
|||
{ |
|||
throw new AbpException( |
|||
$"The default in-memory background worker manager does not support CronExpression without Period for dynamic worker '{workerName}'. " + |
|||
"Please set Period, or use a scheduler-backed provider (Hangfire, Quartz, TickerQ)."); |
|||
} |
|||
|
|||
if (_dynamicWorkers.TryRemove(workerName, out var existingWorker)) |
|||
{ |
|||
await existingWorker.StopAsync(cancellationToken); |
|||
Logger.LogInformation("Replaced existing dynamic worker: {WorkerName}", workerName); |
|||
} |
|||
|
|||
var worker = CreateDynamicWorker(workerName, schedule, handler); |
|||
_dynamicWorkers[workerName] = worker; |
|||
|
|||
await worker.StartAsync(cancellationToken); |
|||
} |
|||
|
|||
public virtual async Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
|
|||
if (!_dynamicWorkers.TryRemove(workerName, out var worker)) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
await worker.StopAsync(cancellationToken); |
|||
return true; |
|||
} |
|||
|
|||
public virtual Task<bool> UpdateScheduleAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
if (schedule.Period == null && !string.IsNullOrWhiteSpace(schedule.CronExpression)) |
|||
{ |
|||
throw new AbpException( |
|||
$"The default in-memory background worker manager does not support CronExpression without Period for dynamic worker '{workerName}'. " + |
|||
"Please set Period, or use a scheduler-backed provider (Hangfire, Quartz, TickerQ)."); |
|||
} |
|||
|
|||
if (!_dynamicWorkers.TryGetValue(workerName, out var worker)) |
|||
{ |
|||
return Task.FromResult(false); |
|||
} |
|||
|
|||
worker.UpdateSchedule(schedule); |
|||
return Task.FromResult(true); |
|||
} |
|||
|
|||
public virtual bool IsRegistered(string workerName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
return _dynamicWorkers.ContainsKey(workerName); |
|||
} |
|||
|
|||
public virtual void Dispose() |
|||
{ |
|||
if (_isDisposed) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
_isDisposed = true; |
|||
|
|||
foreach (var kvp in _dynamicWorkers) |
|||
{ |
|||
try |
|||
{ |
|||
kvp.Value.StopAsync(CancellationToken.None).GetAwaiter().GetResult(); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
Logger.LogException(ex); |
|||
} |
|||
} |
|||
|
|||
_dynamicWorkers.Clear(); |
|||
} |
|||
|
|||
protected virtual InMemoryDynamicBackgroundWorker CreateDynamicWorker( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler) |
|||
{ |
|||
var timer = ServiceProvider.GetRequiredService<AbpAsyncTimer>(); |
|||
var serviceScopeFactory = ServiceProvider.GetRequiredService<IServiceScopeFactory>(); |
|||
|
|||
var worker = new InMemoryDynamicBackgroundWorker( |
|||
workerName, schedule, handler, timer, serviceScopeFactory); |
|||
|
|||
worker.ServiceProvider = ServiceProvider; |
|||
worker.LazyServiceProvider = ServiceProvider.GetRequiredService<IAbpLazyServiceProvider>(); |
|||
|
|||
return worker; |
|||
} |
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public static class DynamicBackgroundWorkerManagerExtensions |
|||
{ |
|||
/// <summary>
|
|||
/// Adds a dynamic worker with the default schedule (<see cref="DynamicBackgroundWorkerSchedule.DefaultPeriod"/>).
|
|||
/// </summary>
|
|||
public static Task AddAsync( |
|||
this IDynamicBackgroundWorkerManager manager, |
|||
string workerName, |
|||
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
return manager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule |
|||
{ |
|||
Period = DynamicBackgroundWorkerSchedule.DefaultPeriod |
|||
}, |
|||
handler, |
|||
cancellationToken); |
|||
} |
|||
} |
|||
@ -0,0 +1,42 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
/// <summary>
|
|||
/// Manages dynamic background workers that are registered at runtime
|
|||
/// without requiring a strongly-typed worker class.
|
|||
/// </summary>
|
|||
public interface IDynamicBackgroundWorkerManager |
|||
{ |
|||
/// <summary>
|
|||
/// Adds a dynamic worker by name, schedule and handler.
|
|||
/// If a worker with the same name already exists, it will be replaced.
|
|||
/// </summary>
|
|||
Task AddAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler, |
|||
CancellationToken cancellationToken = default); |
|||
|
|||
/// <summary>
|
|||
/// Removes a previously added dynamic worker by name.
|
|||
/// Returns true if the worker was found and removed; false otherwise.
|
|||
/// </summary>
|
|||
Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default); |
|||
|
|||
/// <summary>
|
|||
/// Updates the schedule of a previously added dynamic worker.
|
|||
/// Returns true if the worker was found and updated; false otherwise.
|
|||
/// </summary>
|
|||
Task<bool> UpdateScheduleAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
CancellationToken cancellationToken = default); |
|||
|
|||
/// <summary>
|
|||
/// Checks whether a dynamic worker with the given name is registered.
|
|||
/// </summary>
|
|||
bool IsRegistered(string workerName); |
|||
} |
|||
Loading…
Reference in new issue