Browse Source

Add dynamic background worker support

Introduce runtime dynamic background workers: add DynamicBackgroundWorkerExecutionContext, DynamicBackgroundWorkerSchedule, IDynamicBackgroundWorkerHandlerRegistry and its implementation. Extend IBackgroundWorkerManager with AddAsync overloads to register handlers by name and schedule. Provide InMemoryDynamicBackgroundWorker for in-process execution and provider-specific adapters/implementations for Hangfire, Quartz and TickerQ (including Hangfire/Quartz/TickerQ adapters and manager changes) to schedule and execute dynamic handlers. Update BackgroundWorkerManager to hold IServiceProvider and the handler registry and wire DI through constructors. Add a docs example and unit tests to verify handler registration and execution.
pull/25066/head
SALİH ÖZKARA 3 weeks ago
parent
commit
dcf08ec627
  1. 27
      docs/en/framework/infrastructure/background-workers/index.md
  2. 2
      framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobConfiguration.cs
  3. 2
      framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobExecuter.cs
  4. 2
      framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/JobExecutionContext.cs
  5. 2
      framework/src/Volo.Abp.BackgroundJobs.HangFire/Volo/Abp/BackgroundJobs/Hangfire/HangfireJobExecutionAdapter.cs
  6. 2
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs
  7. 2
      framework/src/Volo.Abp.BackgroundJobs.TickerQ/Volo/Abp/BackgroundJobs/TickerQ/AbpBackgroundJobsTickerQModule.cs
  8. 2
      framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/BackgroundJobWorker.cs
  9. 76
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs
  10. 31
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireDynamicBackgroundWorkerAdapter.cs
  11. 77
      framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerManager.cs
  12. 41
      framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzDynamicBackgroundWorkerAdapter.cs
  13. 69
      framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/AbpTickerQBackgroundWorkerManager.cs
  14. 60
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs
  15. 16
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DynamicBackgroundWorkerExecutionContext.cs
  16. 43
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DynamicBackgroundWorkerHandlerRegistry.cs
  17. 10
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DynamicBackgroundWorkerSchedule.cs
  18. 20
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/IBackgroundWorkerManager.cs
  19. 16
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/IDynamicBackgroundWorkerHandlerRegistry.cs
  20. 40
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/InMemoryDynamicBackgroundWorker.cs
  21. 64
      framework/test/Volo.Abp.BackgroundJobs.Tests/Volo/Abp/BackgroundJobs/DynamicBackgroundWorkerManager_Tests.cs

27
docs/en/framework/infrastructure/background-workers/index.md

@ -120,6 +120,33 @@ So, it resolves the given background worker and adds to the `IBackgroundWorkerMa
While we generally add workers in `OnApplicationInitializationAsync`, there are no restrictions on that. You can inject `IBackgroundWorkerManager` anywhere and add workers at runtime. Background worker manager will stop and release all the registered workers when your application is being shut down.
### Add Dynamic Workers at Runtime (Handler in Add)
You can add a runtime worker without pre-defining a dedicated worker class by passing a handler directly to `AddAsync`.
```csharp
await backgroundWorkerManager.AddAsync(
"InventorySyncWorker",
new DynamicBackgroundWorkerSchedule
{
Period = 30000 // 30 seconds
// CronExpression = "*/30 * * * * *" // optional (provider dependent)
},
async (context, cancellationToken) =>
{
var inventorySyncAppService = context.ServiceProvider.GetRequiredService<IInventorySyncAppService>();
await inventorySyncAppService.SyncAsync(cancellationToken);
}
);
```
Key points:
* `workerName` is the runtime identifier of the dynamic worker.
* The `handler` is registered at runtime and executed through the provider-specific worker manager.
* Provider behavior is preserved. For example, providers with persistent schedulers keep their own scheduling semantics.
* The default in-process manager uses in-memory periodic execution.
## Options
`AbpBackgroundWorkerOptions` class is used to [set options](../../fundamentals/options.md) for the background workers. Currently, there is only one option:

2
framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobConfiguration.cs

@ -1,4 +1,4 @@
using System;
using System;
namespace Volo.Abp.BackgroundJobs;

2
framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobExecuter.cs

@ -1,4 +1,4 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using System;

2
framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/JobExecutionContext.cs

@ -1,4 +1,4 @@
using System;
using System;
using System.Threading;
using Volo.Abp.DependencyInjection;

2
framework/src/Volo.Abp.BackgroundJobs.HangFire/Volo/Abp/BackgroundJobs/Hangfire/HangfireJobExecutionAdapter.cs

@ -1,4 +1,4 @@
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using Microsoft.Extensions.DependencyInjection;

2
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs

@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading;

2
framework/src/Volo.Abp.BackgroundJobs.TickerQ/Volo/Abp/BackgroundJobs/TickerQ/AbpBackgroundJobsTickerQModule.cs

@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Reflection;
using Microsoft.Extensions.DependencyInjection;

2
framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/BackgroundJobWorker.cs

@ -1,4 +1,4 @@
using System;
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;

76
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs

@ -9,6 +9,7 @@ using Hangfire.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy;
using Volo.Abp.Hangfire;
@ -20,11 +21,12 @@ namespace Volo.Abp.BackgroundWorkers.Hangfire;
public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency
{
protected AbpHangfireBackgroundJobServer BackgroundJobServer { get; set; } = default!;
protected IServiceProvider ServiceProvider { get; }
public HangfireBackgroundWorkerManager(IServiceProvider serviceProvider)
public HangfireBackgroundWorkerManager(
IServiceProvider serviceProvider,
IDynamicBackgroundWorkerHandlerRegistry dynamicBackgroundWorkerHandlerRegistry)
: base(serviceProvider, dynamicBackgroundWorkerHandlerRegistry)
{
ServiceProvider = serviceProvider;
}
public void Initialize()
@ -137,6 +139,74 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
}
}
public override Task AddAsync(
string workerName,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default)
{
return AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule
{
Period = DynamicBackgroundWorkerSchedule.DefaultPeriod
},
handler,
cancellationToken
);
}
public override Task AddAsync(
string workerName,
DynamicBackgroundWorkerSchedule schedule,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
Check.NotNull(schedule, nameof(schedule));
Check.NotNull(handler, nameof(handler));
DynamicBackgroundWorkerHandlerRegistry.Register(workerName, handler);
var cronExpression = schedule.CronExpression;
if (cronExpression.IsNullOrWhiteSpace())
{
var period = schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod;
cronExpression = GetCron(period);
}
var logger = ServiceProvider.GetRequiredService<ILogger<HangfireBackgroundWorkerManager>>();
var abpHangfireOptions = ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
var queueName = abpHangfireOptions.DefaultQueue;
var recurringJobId = $"DynamicWorker:{workerName}";
if (!JobStorage.Current.HasFeature(JobStorageFeatures.JobQueueProperty))
{
logger.LogError($"Current storage doesn't support specifying queues({queueName}) directly for a specific job. Please use the QueueAttribute instead.");
RecurringJob.AddOrUpdate<HangfireDynamicBackgroundWorkerAdapter>(
recurringJobId,
adapter => adapter.DoWorkAsync(workerName, cancellationToken),
cronExpression,
new RecurringJobOptions
{
TimeZone = TimeZoneInfo.Utc
});
}
else
{
RecurringJob.AddOrUpdate<HangfireDynamicBackgroundWorkerAdapter>(
recurringJobId,
queueName,
adapter => adapter.DoWorkAsync(workerName, cancellationToken),
cronExpression,
new RecurringJobOptions
{
TimeZone = TimeZoneInfo.Utc
});
}
return Task.CompletedTask;
}
private static readonly MethodInfo? GetRecurringJobIdMethodInfo = typeof(RecurringJob).GetMethod("GetRecurringJobId", BindingFlags.NonPublic | BindingFlags.Static);
protected virtual string? GetRecurringJobId(IBackgroundWorker worker, Expression<Func<Task>> methodCall)
{

31
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireDynamicBackgroundWorkerAdapter.cs

@ -0,0 +1,31 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.BackgroundWorkers.Hangfire;
public class HangfireDynamicBackgroundWorkerAdapter : ITransientDependency
{
protected IDynamicBackgroundWorkerHandlerRegistry DynamicBackgroundWorkerHandlerRegistry { get; }
protected IServiceProvider ServiceProvider { get; }
public HangfireDynamicBackgroundWorkerAdapter(
IDynamicBackgroundWorkerHandlerRegistry dynamicBackgroundWorkerHandlerRegistry,
IServiceProvider serviceProvider)
{
DynamicBackgroundWorkerHandlerRegistry = dynamicBackgroundWorkerHandlerRegistry;
ServiceProvider = serviceProvider;
}
public virtual async Task DoWorkAsync(string workerName, CancellationToken cancellationToken = default)
{
var handler = DynamicBackgroundWorkerHandlerRegistry.Get(workerName);
if (handler == null)
{
return;
}
await handler(new DynamicBackgroundWorkerExecutionContext(workerName, ServiceProvider), cancellationToken);
}
}

77
framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerManager.cs

@ -1,6 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Quartz;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy;
@ -10,9 +11,15 @@ namespace Volo.Abp.BackgroundWorkers.Quartz;
[Dependency(ReplaceServices = true)]
public class QuartzBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency
{
public const string DynamicWorkerNameKey = "AbpDynamicWorkerName";
protected IScheduler Scheduler { get; }
public QuartzBackgroundWorkerManager(IScheduler scheduler)
public QuartzBackgroundWorkerManager(
IScheduler scheduler,
IServiceProvider serviceProvider,
IDynamicBackgroundWorkerHandlerRegistry dynamicBackgroundWorkerHandlerRegistry)
: base(serviceProvider, dynamicBackgroundWorkerHandlerRegistry)
{
Scheduler = scheduler;
}
@ -96,4 +103,72 @@ public class QuartzBackgroundWorkerManager : BackgroundWorkerManager, ISingleton
await Scheduler.ScheduleJob(quartzWork.JobDetail, quartzWork.Trigger, cancellationToken);
}
}
public override Task AddAsync(
string workerName,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default)
{
return AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule
{
Period = DynamicBackgroundWorkerSchedule.DefaultPeriod
},
handler,
cancellationToken
);
}
public override async Task AddAsync(
string workerName,
DynamicBackgroundWorkerSchedule schedule,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
Check.NotNull(schedule, nameof(schedule));
Check.NotNull(handler, nameof(handler));
DynamicBackgroundWorkerHandlerRegistry.Register(workerName, handler);
if (schedule.Period == null && schedule.CronExpression.IsNullOrWhiteSpace())
{
throw new AbpException($"Both 'Period' and 'CronExpression' are not set for dynamic worker {workerName}. You must set at least one of them.");
}
var jobKey = new JobKey($"DynamicWorker:{workerName}");
var triggerKey = new TriggerKey($"DynamicWorker:{workerName}");
var jobDetail = JobBuilder.Create<QuartzDynamicBackgroundWorkerAdapter>()
.WithIdentity(jobKey)
.UsingJobData(DynamicWorkerNameKey, workerName)
.Build();
var triggerBuilder = TriggerBuilder.Create()
.ForJob(jobDetail)
.WithIdentity(triggerKey);
if (!schedule.CronExpression.IsNullOrWhiteSpace())
{
triggerBuilder.WithCronSchedule(schedule.CronExpression);
}
else
{
triggerBuilder.WithSimpleSchedule(builder =>
builder.WithInterval(TimeSpan.FromMilliseconds(schedule.Period!.Value)).RepeatForever());
}
var trigger = triggerBuilder.Build();
if (await Scheduler.CheckExists(jobDetail.Key, cancellationToken))
{
await Scheduler.AddJob(jobDetail, true, true, cancellationToken);
await Scheduler.ResumeJob(jobDetail.Key, cancellationToken);
await Scheduler.RescheduleJob(trigger.Key, trigger, cancellationToken);
}
else
{
await Scheduler.ScheduleJob(jobDetail, trigger, cancellationToken);
}
}
}

41
framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzDynamicBackgroundWorkerAdapter.cs

@ -0,0 +1,41 @@
using System;
using System.Threading.Tasks;
using Quartz;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.BackgroundWorkers.Quartz;
public class QuartzDynamicBackgroundWorkerAdapter : IJob, ITransientDependency
{
protected IDynamicBackgroundWorkerHandlerRegistry DynamicBackgroundWorkerHandlerRegistry { get; }
protected IServiceProvider ServiceProvider { get; }
public QuartzDynamicBackgroundWorkerAdapter(
IDynamicBackgroundWorkerHandlerRegistry dynamicBackgroundWorkerHandlerRegistry,
IServiceProvider serviceProvider)
{
DynamicBackgroundWorkerHandlerRegistry = dynamicBackgroundWorkerHandlerRegistry;
ServiceProvider = serviceProvider;
}
public virtual async Task Execute(IJobExecutionContext context)
{
var workerName = context.MergedJobDataMap.GetString(QuartzBackgroundWorkerManager.DynamicWorkerNameKey);
if (string.IsNullOrWhiteSpace(workerName))
{
return;
}
var nonNullWorkerName = workerName!;
var handler = DynamicBackgroundWorkerHandlerRegistry.Get(nonNullWorkerName);
if (handler == null)
{
return;
}
await handler(
new DynamicBackgroundWorkerExecutionContext(nonNullWorkerName, ServiceProvider),
context.CancellationToken
);
}
}

69
framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/AbpTickerQBackgroundWorkerManager.cs

@ -1,8 +1,11 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using TickerQ.Utilities.Entities;
using TickerQ.Utilities.Enums;
using TickerQ.Utilities.Interfaces.Managers;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy;
using Volo.Abp.TickerQ;
@ -14,15 +17,21 @@ public class AbpTickerQBackgroundWorkerManager : BackgroundWorkerManager, ISingl
{
protected AbpTickerQFunctionProvider AbpTickerQFunctionProvider { get; }
protected AbpTickerQBackgroundWorkersProvider AbpTickerQBackgroundWorkersProvider { get; }
protected ICronTickerManager<CronTickerEntity> CronTickerManager { get; }
protected AbpBackgroundWorkersTickerQOptions Options { get; }
public AbpTickerQBackgroundWorkerManager(
AbpTickerQFunctionProvider abpTickerQFunctionProvider,
AbpTickerQBackgroundWorkersProvider abpTickerQBackgroundWorkersProvider,
ICronTickerManager<CronTickerEntity> cronTickerManager,
IServiceProvider serviceProvider,
IDynamicBackgroundWorkerHandlerRegistry dynamicBackgroundWorkerHandlerRegistry,
IOptions<AbpBackgroundWorkersTickerQOptions> options)
: base(serviceProvider, dynamicBackgroundWorkerHandlerRegistry)
{
AbpTickerQFunctionProvider = abpTickerQFunctionProvider;
AbpTickerQBackgroundWorkersProvider = abpTickerQBackgroundWorkersProvider;
CronTickerManager = cronTickerManager;
Options = options.Value;
}
@ -70,6 +79,66 @@ public class AbpTickerQBackgroundWorkerManager : BackgroundWorkerManager, ISingl
await base.AddAsync(worker, cancellationToken);
}
public override Task AddAsync(
string workerName,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default)
{
return AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule
{
Period = DynamicBackgroundWorkerSchedule.DefaultPeriod
},
handler,
cancellationToken
);
}
public override async Task AddAsync(
string workerName,
DynamicBackgroundWorkerSchedule schedule,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
Check.NotNull(schedule, nameof(schedule));
Check.NotNull(handler, nameof(handler));
DynamicBackgroundWorkerHandlerRegistry.Register(workerName, handler);
var cronExpression = schedule.CronExpression ?? GetCron(schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod);
var functionName = $"DynamicWorker:{workerName}";
AbpTickerQFunctionProvider.Functions[functionName] =
(string.Empty, TickerTaskPriority.LongRunning, async (tickerCancellationToken, serviceProvider, _) =>
{
var registeredHandler = DynamicBackgroundWorkerHandlerRegistry.Get(workerName);
if (registeredHandler == null)
{
return;
}
await registeredHandler(
new DynamicBackgroundWorkerExecutionContext(workerName, serviceProvider),
tickerCancellationToken
);
});
AbpTickerQBackgroundWorkersProvider.BackgroundWorkers[functionName] = new AbpTickerQCronBackgroundWorker
{
Function = functionName,
CronExpression = cronExpression,
WorkerType = typeof(AbpTickerQBackgroundWorkerManager)
};
await CronTickerManager.AddAsync(new CronTickerEntity
{
Function = functionName,
Expression = cronExpression
});
}
protected virtual string GetCron(int period)
{
var time = TimeSpan.FromMilliseconds(period);

60
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs

@ -1,8 +1,10 @@
using System;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers;
@ -16,13 +18,19 @@ public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDepen
private bool _isDisposed;
private readonly List<IBackgroundWorker> _backgroundWorkers;
protected IServiceProvider ServiceProvider { get; }
protected IDynamicBackgroundWorkerHandlerRegistry DynamicBackgroundWorkerHandlerRegistry { get; }
/// <summary>
/// Initializes a new instance of the <see cref="BackgroundWorkerManager"/> class.
/// </summary>
public BackgroundWorkerManager()
public BackgroundWorkerManager(
IServiceProvider serviceProvider,
IDynamicBackgroundWorkerHandlerRegistry dynamicBackgroundWorkerHandlerRegistry)
{
_backgroundWorkers = new List<IBackgroundWorker>();
ServiceProvider = serviceProvider;
DynamicBackgroundWorkerHandlerRegistry = dynamicBackgroundWorkerHandlerRegistry;
}
public virtual async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
@ -35,6 +43,54 @@ public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDepen
}
}
public virtual Task AddAsync(
string workerName,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default)
{
return AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule
{
Period = DynamicBackgroundWorkerSchedule.DefaultPeriod
},
handler,
cancellationToken
);
}
public virtual async Task AddAsync(
string workerName,
DynamicBackgroundWorkerSchedule schedule,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
Check.NotNull(schedule, nameof(schedule));
Check.NotNull(handler, nameof(handler));
DynamicBackgroundWorkerHandlerRegistry.Register(workerName, handler);
if (schedule.Period == null && !string.IsNullOrWhiteSpace(schedule.CronExpression))
{
throw new AbpException("Default background worker manager does not support cron expression without period.");
}
var timer = ServiceProvider.GetRequiredService<AbpAsyncTimer>();
var serviceScopeFactory = ServiceProvider.GetRequiredService<IServiceScopeFactory>();
var worker = new InMemoryDynamicBackgroundWorker(
workerName,
schedule,
timer,
serviceScopeFactory,
DynamicBackgroundWorkerHandlerRegistry
);
worker.ServiceProvider = ServiceProvider;
worker.LazyServiceProvider = ServiceProvider.GetRequiredService<IAbpLazyServiceProvider>();
await AddAsync(worker, cancellationToken);
}
public virtual void Dispose()
{
if (_isDisposed)

16
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DynamicBackgroundWorkerExecutionContext.cs

@ -0,0 +1,16 @@
using System;
namespace Volo.Abp.BackgroundWorkers;
public class DynamicBackgroundWorkerExecutionContext
{
public string WorkerName { get; }
public IServiceProvider ServiceProvider { get; }
public DynamicBackgroundWorkerExecutionContext(string workerName, IServiceProvider serviceProvider)
{
WorkerName = Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
ServiceProvider = Check.NotNull(serviceProvider, nameof(serviceProvider));
}
}

43
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DynamicBackgroundWorkerHandlerRegistry.cs

@ -0,0 +1,43 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.BackgroundWorkers;
public class DynamicBackgroundWorkerHandlerRegistry : IDynamicBackgroundWorkerHandlerRegistry, ISingletonDependency
{
protected ConcurrentDictionary<string, Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task>> Handlers { get; }
public DynamicBackgroundWorkerHandlerRegistry()
{
Handlers = new ConcurrentDictionary<string, Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task>>();
}
public virtual void Register(string workerName, Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
Check.NotNull(handler, nameof(handler));
Handlers[workerName] = handler;
}
public virtual bool Unregister(string workerName)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
return Handlers.TryRemove(workerName, out _);
}
public virtual bool IsRegistered(string workerName)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
return Handlers.ContainsKey(workerName);
}
public virtual Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task>? Get(string workerName)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
return Handlers.TryGetValue(workerName, out var handler) ? handler : null;
}
}

10
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DynamicBackgroundWorkerSchedule.cs

@ -0,0 +1,10 @@
namespace Volo.Abp.BackgroundWorkers;
public class DynamicBackgroundWorkerSchedule
{
public const int DefaultPeriod = 60000;
public int? Period { get; set; }
public string? CronExpression { get; set; }
}

20
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/IBackgroundWorkerManager.cs

@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Threading;
@ -17,4 +18,21 @@ public interface IBackgroundWorkerManager : IRunnable
/// </param>
/// <param name="cancellationToken"></param>
Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default);
/// <summary>
/// Adds a dynamic worker by name and handler.
/// </summary>
Task AddAsync(
string workerName,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default);
/// <summary>
/// Adds a dynamic worker by name, schedule and handler.
/// </summary>
Task AddAsync(
string workerName,
DynamicBackgroundWorkerSchedule schedule,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default);
}

16
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/IDynamicBackgroundWorkerHandlerRegistry.cs

@ -0,0 +1,16 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Volo.Abp.BackgroundWorkers;
public interface IDynamicBackgroundWorkerHandlerRegistry
{
void Register(string workerName, Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler);
bool Unregister(string workerName);
bool IsRegistered(string workerName);
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task>? Get(string workerName);
}

40
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/InMemoryDynamicBackgroundWorker.cs

@ -0,0 +1,40 @@
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers;
public class InMemoryDynamicBackgroundWorker : AsyncPeriodicBackgroundWorkerBase
{
protected string WorkerName { get; }
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; }
public InMemoryDynamicBackgroundWorker(
string workerName,
DynamicBackgroundWorkerSchedule schedule,
AbpAsyncTimer timer,
IServiceScopeFactory serviceScopeFactory,
IDynamicBackgroundWorkerHandlerRegistry handlerRegistry)
: base(timer, serviceScopeFactory)
{
WorkerName = Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
Check.NotNull(schedule, nameof(schedule));
HandlerRegistry = Check.NotNull(handlerRegistry, nameof(handlerRegistry));
Timer.Period = schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod;
CronExpression = schedule.CronExpression;
}
protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
{
var handler = HandlerRegistry.Get(WorkerName);
if (handler == null)
{
Logger.LogWarning("No dynamic background worker handler registered for: {WorkerName}", WorkerName);
return;
}
await handler(new DynamicBackgroundWorkerExecutionContext(WorkerName, workerContext.ServiceProvider), workerContext.CancellationToken);
}
}

64
framework/test/Volo.Abp.BackgroundJobs.Tests/Volo/Abp/BackgroundJobs/DynamicBackgroundWorkerManager_Tests.cs

@ -0,0 +1,64 @@
using System;
using System.Threading.Tasks;
using Shouldly;
using Volo.Abp.BackgroundWorkers;
using Xunit;
namespace Volo.Abp.BackgroundJobs;
public class DynamicBackgroundWorkerManager_Tests : BackgroundJobsTestBase
{
private readonly IBackgroundWorkerManager _backgroundWorkerManager;
private readonly IDynamicBackgroundWorkerHandlerRegistry _handlerRegistry;
public DynamicBackgroundWorkerManager_Tests()
{
_backgroundWorkerManager = GetRequiredService<IBackgroundWorkerManager>();
_handlerRegistry = GetRequiredService<IDynamicBackgroundWorkerHandlerRegistry>();
}
[Fact]
public async Task Should_Register_Dynamic_Handler_When_Added()
{
var workerName = "dynamic-worker-" + Guid.NewGuid();
await _backgroundWorkerManager.AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule
{
Period = 1000
},
(_, _) => Task.CompletedTask
);
_handlerRegistry.IsRegistered(workerName).ShouldBeTrue();
}
[Fact]
public async Task Should_Execute_Dynamic_Handler()
{
var workerName = "dynamic-worker-" + Guid.NewGuid();
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
await _backgroundWorkerManager.AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule
{
Period = 50
},
(context, _) =>
{
if (context.WorkerName == workerName)
{
tcs.TrySetResult(true);
}
return Task.CompletedTask;
}
);
var completedTask = await Task.WhenAny(tcs.Task, Task.Delay(5000));
completedTask.ShouldBe(tcs.Task);
(await tcs.Task).ShouldBeTrue();
}
}
Loading…
Cancel
Save