mirror of https://github.com/abpframework/abp.git
committed by
GitHub
41 changed files with 2480 additions and 19 deletions
@ -0,0 +1,224 @@ |
|||
# Dynamic Background Jobs and Workers in ABP |
|||
|
|||
ABP's Background Jobs and Background Workers are two well-established infrastructure pieces. Background jobs handle fire-and-forget async tasks — sending emails, generating reports, processing orders. Background workers handle continuously running periodic tasks — syncing inventory, cleaning up expired data, pushing scheduled notifications. |
|||
|
|||
This works great, but it has one assumption: **you know all your job and worker types at compile time**. |
|||
|
|||
In practice, that assumption breaks down more often than you'd expect: |
|||
|
|||
- You're building a **plugin system** where third-party plugins need to register their own background processing logic at runtime — you can't pre-define an `IBackgroundJob<TArgs>` implementation in the host project for every possible plugin |
|||
- Your system needs to execute background tasks based on **external configuration** (database, API responses) — the task types and parameters are entirely unknown at compile time |
|||
- Your **multi-tenant SaaS platform** needs different sync intervals for different tenants — some every 30 seconds, some every 5 minutes — and you need to adjust these without restarting the application |
|||
- You're building a **low-code/no-code platform** where end users define automation workflows through a visual designer, and those workflows need to run as background jobs or scheduled tasks — the job types and scheduling parameters are entirely determined by end users at runtime, unknowable to developers at compile time |
|||
|
|||
ABP's **Dynamic Background Jobs** (`IDynamicBackgroundJobManager`) and **Dynamic Background Workers** (`IDynamicBackgroundWorkerManager`) are designed for exactly these scenarios. They let you register, enqueue, schedule, and manage background tasks by name at runtime, with no compile-time type binding required. |
|||
|
|||
## Dynamic Background Jobs |
|||
|
|||
`IDynamicBackgroundJobManager` offers two usage patterns, covering different levels of runtime flexibility. |
|||
|
|||
### Enqueue an Existing Typed Job by Name |
|||
|
|||
If you already have a typed background job (say, an `EmailSendingJob` registered via `[BackgroundJobName("emails")]`), you can enqueue it by name without referencing its args type: |
|||
|
|||
```csharp |
|||
public class OrderAppService : ApplicationService |
|||
{ |
|||
private readonly IDynamicBackgroundJobManager _dynamicJobManager; |
|||
|
|||
public OrderAppService(IDynamicBackgroundJobManager dynamicJobManager) |
|||
{ |
|||
_dynamicJobManager = dynamicJobManager; |
|||
} |
|||
|
|||
public async Task PlaceOrderAsync(PlaceOrderInput input) |
|||
{ |
|||
// Business logic... |
|||
|
|||
// Enqueue a confirmation email — no reference to EmailSendingJobArgs needed |
|||
await _dynamicJobManager.EnqueueAsync("emails", new |
|||
{ |
|||
EmailAddress = input.CustomerEmail, |
|||
Subject = "Order Confirmed", |
|||
Body = $"Your order {input.OrderId} has been placed." |
|||
}); |
|||
} |
|||
} |
|||
``` |
|||
|
|||
The framework looks up the typed job configuration by name, serializes the anonymous object, deserializes it into the correct args type, and feeds it through the standard typed job pipeline. The caller doesn't need to `using` any specific project namespace. |
|||
|
|||
### Register a Runtime Dynamic Handler |
|||
|
|||
When you don't even have a job type — say a plugin decides at startup what processing logic to register — you can register a handler directly: |
|||
|
|||
```csharp |
|||
public override async Task OnApplicationInitializationAsync( |
|||
ApplicationInitializationContext context) |
|||
{ |
|||
var dynamicJobManager = context.ServiceProvider |
|||
.GetRequiredService<IDynamicBackgroundJobManager>(); |
|||
|
|||
// A plugin registers its own processing logic at startup |
|||
dynamicJobManager.RegisterHandler("SyncExternalCatalog", async (jobContext, ct) => |
|||
{ |
|||
using var doc = JsonDocument.Parse(jobContext.JsonData); |
|||
var catalogUrl = doc.RootElement.GetProperty("url").GetString(); |
|||
|
|||
var httpClient = jobContext.ServiceProvider |
|||
.GetRequiredService<IHttpClientFactory>() |
|||
.CreateClient(); |
|||
|
|||
var catalog = await httpClient.GetStringAsync(catalogUrl, ct); |
|||
// Process catalog data... |
|||
}); |
|||
|
|||
// Now you can enqueue jobs for this handler |
|||
await dynamicJobManager.EnqueueAsync("SyncExternalCatalog", new |
|||
{ |
|||
Url = "https://partner-api.example.com/catalog" |
|||
}); |
|||
} |
|||
``` |
|||
|
|||
The handler receives a context object containing `JsonData` (the raw JSON string) and `ServiceProvider` (a scoped container). Resolving dependencies from `ServiceProvider` is the recommended approach — avoid capturing external state in the handler closure. |
|||
|
|||
There's one priority rule to keep in mind: **if a name matches both a typed job and a dynamic handler, the typed job wins**. Dynamic handlers never accidentally override existing typed jobs. |
|||
|
|||
> Dynamic jobs ultimately go through the standard typed job pipeline, so they **work with every background job provider** — Default, Hangfire, Quartz, RabbitMQ, TickerQ — without any provider-specific code. |
|||
|
|||
## Dynamic Background Workers |
|||
|
|||
`IDynamicBackgroundWorkerManager` lets you register periodic tasks at runtime and manage their full lifecycle: add, remove, update schedule. |
|||
|
|||
```csharp |
|||
public override async Task OnApplicationInitializationAsync( |
|||
ApplicationInitializationContext context) |
|||
{ |
|||
var workerManager = context.ServiceProvider |
|||
.GetRequiredService<IDynamicBackgroundWorkerManager>(); |
|||
|
|||
await workerManager.AddAsync( |
|||
"InventorySyncWorker", |
|||
new DynamicBackgroundWorkerSchedule |
|||
{ |
|||
Period = 30000 // 30 seconds |
|||
}, |
|||
async (workerContext, cancellationToken) => |
|||
{ |
|||
var syncService = workerContext.ServiceProvider |
|||
.GetRequiredService<IInventorySyncAppService>(); |
|||
|
|||
await syncService.SyncAsync(cancellationToken); |
|||
} |
|||
); |
|||
} |
|||
``` |
|||
|
|||
If you're using Hangfire or Quartz as your provider, you can use a cron expression instead of a fixed interval: |
|||
|
|||
```csharp |
|||
await workerManager.AddAsync( |
|||
"DailyReportWorker", |
|||
new DynamicBackgroundWorkerSchedule |
|||
{ |
|||
CronExpression = "0 2 * * *" // Every day at 2:00 AM |
|||
}, |
|||
async (workerContext, cancellationToken) => |
|||
{ |
|||
var reportService = workerContext.ServiceProvider |
|||
.GetRequiredService<IReportAppService>(); |
|||
|
|||
await reportService.GenerateDailyReportAsync(cancellationToken); |
|||
} |
|||
); |
|||
``` |
|||
|
|||
### Runtime Schedule Management |
|||
|
|||
Adding a worker is just the beginning. The real value of dynamic workers is that the entire lifecycle is controllable at runtime: |
|||
|
|||
```csharp |
|||
// Check if a worker is currently registered |
|||
bool exists = workerManager.IsRegistered("InventorySyncWorker"); |
|||
|
|||
// A tenant upgrades their plan — speed up sync from 30s to 10s |
|||
await workerManager.UpdateScheduleAsync( |
|||
"InventorySyncWorker", |
|||
new DynamicBackgroundWorkerSchedule { Period = 10000 } |
|||
); |
|||
|
|||
// Tenant disables the sync feature — remove the worker entirely |
|||
await workerManager.RemoveAsync("InventorySyncWorker"); |
|||
``` |
|||
|
|||
`UpdateScheduleAsync` only changes the schedule — the handler itself stays the same. For persistent providers like Hangfire and Quartz, `UpdateScheduleAsync` and `RemoveAsync` can operate on the persistent scheduling record even after an application restart, when the handler is no longer in memory. |
|||
|
|||
### Stopping All Workers |
|||
|
|||
When you need to stop all dynamic workers at once (e.g., as part of a graceful shutdown), call `StopAllAsync`: |
|||
|
|||
```csharp |
|||
await workerManager.StopAllAsync(cancellationToken); |
|||
``` |
|||
|
|||
All registered workers are stopped and cleaned up, and the handler registry is cleared. Calling `AddAsync` or `UpdateScheduleAsync` after this throws `ObjectDisposedException` — this is intentional, preventing new workers from being added during a shutdown sequence. |
|||
|
|||
## Provider Support |
|||
|
|||
Dynamic background jobs and dynamic background workers have different levels of provider support. |
|||
|
|||
**Dynamic background jobs** are compatible with all providers because they reuse the standard typed job pipeline: |
|||
|
|||
| Provider | Supported | |
|||
|---|---| |
|||
| Default (In-Memory) | ✅ | |
|||
| Hangfire | ✅ | |
|||
| Quartz | ✅ | |
|||
| RabbitMQ | ✅ | |
|||
| TickerQ | ✅ | |
|||
|
|||
**Dynamic background workers** have per-provider implementations: |
|||
|
|||
| Provider | AddAsync | RemoveAsync | UpdateScheduleAsync | Period | CronExpression | |
|||
|---|---|---|---|---|---| |
|||
| Default (In-Memory) | ✅ | ✅ | ✅ | ✅ | ❌ | |
|||
| Hangfire | ✅ | ✅ | ✅ | ✅ | ✅ | |
|||
| Quartz | ✅ | ✅ | ✅ | ✅ | ✅ | |
|||
| TickerQ | ❌ | ❌ | ❌ | — | — | |
|||
|
|||
TickerQ uses `FrozenDictionary` for function registration, which requires all functions to be registered before the application starts. Runtime dynamic registration is not possible. |
|||
|
|||
## Restart Behavior |
|||
|
|||
Dynamic handlers are stored **in memory** and are not persisted across application restarts. This is a deliberate design choice — handlers are code logic (delegates), and code logic is inherently not serializable. |
|||
|
|||
For persistent providers (Hangfire, Quartz), this means: enqueued jobs and recurring job entries survive a restart in the database, but the handlers need to be re-registered. If a handler is not re-registered, the job executor throws an exception (background jobs) or skips the execution with a warning log (background workers). |
|||
|
|||
The recommended approach is to register handlers in `OnApplicationInitializationAsync`, so they are automatically restored on every startup: |
|||
|
|||
```csharp |
|||
public override async Task OnApplicationInitializationAsync( |
|||
ApplicationInitializationContext context) |
|||
{ |
|||
var dynamicJobManager = context.ServiceProvider |
|||
.GetRequiredService<IDynamicBackgroundJobManager>(); |
|||
|
|||
// Re-registered on every startup — persistent jobs will find their handler |
|||
dynamicJobManager.RegisterHandler("SyncExternalCatalog", async (jobContext, ct) => |
|||
{ |
|||
// handler logic... |
|||
}); |
|||
} |
|||
``` |
|||
|
|||
## Summary |
|||
|
|||
`IDynamicBackgroundJobManager` lets you enqueue jobs and register handlers by name at runtime, compatible with all background job providers, no compile-time types required. `IDynamicBackgroundWorkerManager` lets you add, remove, and update the schedule of periodic workers at runtime — Hangfire and Quartz providers also support cron expressions. Register handlers in `OnApplicationInitializationAsync` to ensure automatic recovery on every startup. |
|||
|
|||
## References |
|||
|
|||
- [Background Jobs](https://abp.io/docs/latest/framework/infrastructure/background-jobs) |
|||
- [Background Workers](https://abp.io/docs/latest/framework/infrastructure/background-workers) |
|||
- [Hangfire Background Job Manager](https://abp.io/docs/latest/framework/infrastructure/background-jobs/hangfire) |
|||
- [Quartz Background Job Manager](https://abp.io/docs/latest/framework/infrastructure/background-jobs/quartz) |
|||
|
After Width: | Height: | Size: 95 KiB |
@ -0,0 +1,144 @@ |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Linq; |
|||
using System.Linq.Expressions; |
|||
using System.Reflection; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Json; |
|||
|
|||
namespace Volo.Abp.BackgroundJobs; |
|||
|
|||
public class DefaultDynamicBackgroundJobManager : IDynamicBackgroundJobManager, ITransientDependency |
|||
{ |
|||
private static readonly ConcurrentDictionary<Type, Func<IBackgroundJobManager, object, BackgroundJobPriority, TimeSpan?, Task<string>>> EnqueueDelegateCache = new(); |
|||
|
|||
protected IBackgroundJobManager BackgroundJobManager { get; } |
|||
protected IDynamicBackgroundJobHandlerRegistry HandlerRegistry { get; } |
|||
protected AbpBackgroundJobOptions BackgroundJobOptions { get; } |
|||
protected IJsonSerializer JsonSerializer { get; } |
|||
public ILogger<DefaultDynamicBackgroundJobManager> Logger { get; set; } |
|||
|
|||
public DefaultDynamicBackgroundJobManager( |
|||
IBackgroundJobManager backgroundJobManager, |
|||
IDynamicBackgroundJobHandlerRegistry handlerRegistry, |
|||
IOptions<AbpBackgroundJobOptions> backgroundJobOptions, |
|||
IJsonSerializer jsonSerializer) |
|||
{ |
|||
BackgroundJobManager = backgroundJobManager; |
|||
HandlerRegistry = handlerRegistry; |
|||
BackgroundJobOptions = backgroundJobOptions.Value; |
|||
JsonSerializer = jsonSerializer; |
|||
Logger = NullLogger<DefaultDynamicBackgroundJobManager>.Instance; |
|||
} |
|||
|
|||
public virtual async Task<string> EnqueueAsync( |
|||
string jobName, |
|||
object args, |
|||
BackgroundJobPriority priority = BackgroundJobPriority.Normal, |
|||
TimeSpan? delay = null) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(jobName, nameof(jobName)); |
|||
Check.NotNull(args, nameof(args)); |
|||
|
|||
var jobConfiguration = BackgroundJobOptions.GetJobOrNull(jobName); |
|||
if (jobConfiguration != null) |
|||
{ |
|||
return await EnqueueTypedJobAsync(jobConfiguration, args, priority, delay); |
|||
} |
|||
|
|||
if (HandlerRegistry.IsRegistered(jobName)) |
|||
{ |
|||
return await EnqueueDynamicHandlerJobAsync(jobName, args, priority, delay); |
|||
} |
|||
|
|||
throw new AbpException( |
|||
$"No typed job configuration or dynamic handler registered for job name: {jobName}"); |
|||
} |
|||
|
|||
public virtual void RegisterHandler( |
|||
string jobName, |
|||
DynamicBackgroundJobHandler handler) |
|||
{ |
|||
HandlerRegistry.Register(jobName, handler); |
|||
} |
|||
|
|||
public virtual bool UnregisterHandler(string jobName) |
|||
{ |
|||
return HandlerRegistry.Unregister(jobName); |
|||
} |
|||
|
|||
public virtual bool IsHandlerRegistered(string jobName) |
|||
{ |
|||
return HandlerRegistry.IsRegistered(jobName); |
|||
} |
|||
|
|||
protected virtual async Task<string> EnqueueTypedJobAsync( |
|||
BackgroundJobConfiguration jobConfiguration, |
|||
object args, |
|||
BackgroundJobPriority priority, |
|||
TimeSpan? delay) |
|||
{ |
|||
var argsType = jobConfiguration.ArgsType; |
|||
|
|||
// Normalize args to the expected type via JSON round-trip
|
|||
var json = JsonSerializer.Serialize(args); |
|||
var typedArgs = JsonSerializer.Deserialize(argsType, json); |
|||
|
|||
var enqueueDelegate = GetOrCreateEnqueueDelegate(argsType); |
|||
return await enqueueDelegate(BackgroundJobManager, typedArgs, priority, delay); |
|||
} |
|||
|
|||
protected virtual Task<string> EnqueueDynamicHandlerJobAsync( |
|||
string jobName, |
|||
object args, |
|||
BackgroundJobPriority priority, |
|||
TimeSpan? delay) |
|||
{ |
|||
var jsonData = JsonSerializer.Serialize(args); |
|||
var dynamicArgs = new DynamicBackgroundJobArgs(jobName, jsonData); |
|||
return BackgroundJobManager.EnqueueAsync(dynamicArgs, priority, delay); |
|||
} |
|||
|
|||
private static Func<IBackgroundJobManager, object, BackgroundJobPriority, TimeSpan?, Task<string>> GetOrCreateEnqueueDelegate(Type argsType) |
|||
{ |
|||
return EnqueueDelegateCache.GetOrAdd(argsType, static type => |
|||
{ |
|||
var method = typeof(IBackgroundJobManager) |
|||
.GetMethods(BindingFlags.Public | BindingFlags.Instance) |
|||
.FirstOrDefault(m => |
|||
m.Name == nameof(IBackgroundJobManager.EnqueueAsync) |
|||
&& m.IsGenericMethodDefinition |
|||
&& m.GetParameters() is { Length: 3 } p |
|||
&& p[1].ParameterType == typeof(BackgroundJobPriority) |
|||
&& p[2].ParameterType == typeof(TimeSpan?)); |
|||
|
|||
if (method == null) |
|||
{ |
|||
throw new AbpException( |
|||
$"Could not find the generic EnqueueAsync method on {nameof(IBackgroundJobManager)}."); |
|||
} |
|||
|
|||
var genericMethod = method.MakeGenericMethod(type); |
|||
|
|||
// Build: (manager, args, priority, delay) => manager.EnqueueAsync<TArgs>((TArgs)args, priority, delay)
|
|||
var managerParam = Expression.Parameter(typeof(IBackgroundJobManager), "manager"); |
|||
var argsParam = Expression.Parameter(typeof(object), "args"); |
|||
var priorityParam = Expression.Parameter(typeof(BackgroundJobPriority), "priority"); |
|||
var delayParam = Expression.Parameter(typeof(TimeSpan?), "delay"); |
|||
|
|||
var call = Expression.Call( |
|||
managerParam, |
|||
genericMethod, |
|||
Expression.Convert(argsParam, type), |
|||
priorityParam, |
|||
delayParam); |
|||
|
|||
return Expression.Lambda<Func<IBackgroundJobManager, object, BackgroundJobPriority, TimeSpan?, Task<string>>>( |
|||
call, managerParam, argsParam, priorityParam, delayParam).Compile(); |
|||
}); |
|||
} |
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
namespace Volo.Abp.BackgroundJobs; |
|||
|
|||
[BackgroundJobName(JobNameConstant)] |
|||
public class DynamicBackgroundJobArgs |
|||
{ |
|||
public const string JobNameConstant = "Abp.DynamicJob"; |
|||
|
|||
public string JobName { get; private set; } |
|||
|
|||
public string JsonData { get; private set; } |
|||
|
|||
// For serializers that require a parameterless constructor (e.g. System.Text.Json)
|
|||
private DynamicBackgroundJobArgs() |
|||
{ |
|||
JobName = string.Empty; |
|||
JsonData = string.Empty; |
|||
} |
|||
|
|||
public DynamicBackgroundJobArgs(string jobName, string jsonData) |
|||
{ |
|||
JobName = Check.NotNullOrWhiteSpace(jobName, nameof(jobName)); |
|||
JsonData = Check.NotNull(jsonData, nameof(jsonData)); |
|||
} |
|||
} |
|||
@ -0,0 +1,22 @@ |
|||
using System; |
|||
|
|||
namespace Volo.Abp.BackgroundJobs; |
|||
|
|||
public class DynamicBackgroundJobExecutionContext |
|||
{ |
|||
public string JobName { get; } |
|||
|
|||
public string JsonData { get; } |
|||
|
|||
public IServiceProvider ServiceProvider { get; } |
|||
|
|||
public DynamicBackgroundJobExecutionContext( |
|||
string jobName, |
|||
string jsonData, |
|||
IServiceProvider serviceProvider) |
|||
{ |
|||
JobName = Check.NotNullOrWhiteSpace(jobName, nameof(jobName)); |
|||
JsonData = Check.NotNull(jsonData, nameof(jsonData)); |
|||
ServiceProvider = Check.NotNull(serviceProvider, nameof(serviceProvider)); |
|||
} |
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Threading; |
|||
|
|||
namespace Volo.Abp.BackgroundJobs; |
|||
|
|||
public class DynamicBackgroundJobExecutorJob : AsyncBackgroundJob<DynamicBackgroundJobArgs>, ITransientDependency |
|||
{ |
|||
protected IDynamicBackgroundJobHandlerRegistry HandlerRegistry { get; } |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
|
|||
public DynamicBackgroundJobExecutorJob( |
|||
IDynamicBackgroundJobHandlerRegistry handlerRegistry, |
|||
IServiceProvider serviceProvider) |
|||
{ |
|||
HandlerRegistry = handlerRegistry; |
|||
ServiceProvider = serviceProvider; |
|||
} |
|||
|
|||
public override async Task ExecuteAsync(DynamicBackgroundJobArgs args) |
|||
{ |
|||
Logger.LogDebug( |
|||
"Executing dynamic job. TransportJobName: {TransportJobName}, EffectiveJobName: {EffectiveJobName}", |
|||
DynamicBackgroundJobArgs.JobNameConstant, |
|||
args.JobName |
|||
); |
|||
|
|||
var handler = HandlerRegistry.Get(args.JobName); |
|||
if (handler == null) |
|||
{ |
|||
throw new AbpException( |
|||
$"No dynamic job handler registered for: {args.JobName}. " + |
|||
$"The handler may have been unregistered or the application restarted since the job was enqueued."); |
|||
} |
|||
|
|||
var cancellationToken = ServiceProvider.GetRequiredService<ICancellationTokenProvider>().Token; |
|||
var executionContext = new DynamicBackgroundJobExecutionContext(args.JobName, args.JsonData, ServiceProvider); |
|||
await handler(executionContext, cancellationToken); |
|||
} |
|||
} |
|||
@ -0,0 +1,9 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.BackgroundJobs; |
|||
|
|||
/// <summary>
|
|||
/// Represents a handler delegate for dynamic background jobs.
|
|||
/// </summary>
|
|||
public delegate Task DynamicBackgroundJobHandler(DynamicBackgroundJobExecutionContext context, CancellationToken cancellationToken); |
|||
@ -0,0 +1,52 @@ |
|||
using System.Collections.Concurrent; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace Volo.Abp.BackgroundJobs; |
|||
|
|||
public class DynamicBackgroundJobHandlerRegistry : IDynamicBackgroundJobHandlerRegistry, ISingletonDependency |
|||
{ |
|||
protected ConcurrentDictionary<string, DynamicBackgroundJobHandler> Handlers { get; } |
|||
|
|||
public DynamicBackgroundJobHandlerRegistry() |
|||
{ |
|||
Handlers = new ConcurrentDictionary<string, DynamicBackgroundJobHandler>(); |
|||
} |
|||
|
|||
public virtual void Register(string jobName, DynamicBackgroundJobHandler handler) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(jobName, nameof(jobName)); |
|||
Check.NotNull(handler, nameof(handler)); |
|||
|
|||
Handlers[jobName] = handler; |
|||
} |
|||
|
|||
public virtual bool Unregister(string jobName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(jobName, nameof(jobName)); |
|||
return Handlers.TryRemove(jobName, out _); |
|||
} |
|||
|
|||
public virtual bool IsRegistered(string jobName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(jobName, nameof(jobName)); |
|||
return Handlers.ContainsKey(jobName); |
|||
} |
|||
|
|||
public virtual DynamicBackgroundJobHandler? Get(string jobName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(jobName, nameof(jobName)); |
|||
return Handlers.TryGetValue(jobName, out var handler) ? handler : null; |
|||
} |
|||
|
|||
public virtual IReadOnlyCollection<string> GetAllNames() |
|||
{ |
|||
return Handlers.Keys.ToList().AsReadOnly(); |
|||
} |
|||
|
|||
public virtual void Clear() |
|||
{ |
|||
Handlers.Clear(); |
|||
} |
|||
} |
|||
@ -0,0 +1,18 @@ |
|||
using System.Collections.Generic; |
|||
|
|||
namespace Volo.Abp.BackgroundJobs; |
|||
|
|||
public interface IDynamicBackgroundJobHandlerRegistry |
|||
{ |
|||
void Register(string jobName, DynamicBackgroundJobHandler handler); |
|||
|
|||
bool Unregister(string jobName); |
|||
|
|||
bool IsRegistered(string jobName); |
|||
|
|||
DynamicBackgroundJobHandler? Get(string jobName); |
|||
|
|||
IReadOnlyCollection<string> GetAllNames(); |
|||
|
|||
void Clear(); |
|||
} |
|||
@ -0,0 +1,51 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.BackgroundJobs; |
|||
|
|||
/// <summary>
|
|||
/// Manages dynamic background jobs that can be enqueued by name
|
|||
/// without requiring a strongly-typed job args class at compile time.
|
|||
/// Also supports registering dynamic job handlers at runtime.
|
|||
/// </summary>
|
|||
public interface IDynamicBackgroundJobManager |
|||
{ |
|||
/// <summary>
|
|||
/// Enqueues a job by name with dynamic payload.
|
|||
/// If a typed job configuration exists for this name, the args will be
|
|||
/// deserialized to the configured args type and enqueued through the typed pipeline.
|
|||
/// If a dynamic handler is registered, the args will be wrapped as
|
|||
/// <see cref="DynamicBackgroundJobArgs"/> and enqueued through the standard typed pipeline.
|
|||
/// </summary>
|
|||
/// <param name="jobName">Name of the background job.</param>
|
|||
/// <param name="args">Job arguments (will be serialized to JSON).</param>
|
|||
/// <param name="priority">Job priority.</param>
|
|||
/// <param name="delay">Job delay (wait duration before first try).</param>
|
|||
/// <returns>Unique identifier of a background job.</returns>
|
|||
Task<string> EnqueueAsync( |
|||
string jobName, |
|||
object args, |
|||
BackgroundJobPriority priority = BackgroundJobPriority.Normal, |
|||
TimeSpan? delay = null); |
|||
|
|||
/// <summary>
|
|||
/// Registers a dynamic job handler at runtime.
|
|||
/// </summary>
|
|||
/// <param name="jobName">Unique name for the dynamic job.</param>
|
|||
/// <param name="handler">The handler delegate to execute when the job runs.</param>
|
|||
void RegisterHandler(string jobName, DynamicBackgroundJobHandler handler); |
|||
|
|||
/// <summary>
|
|||
/// Unregisters a previously registered dynamic job handler.
|
|||
/// </summary>
|
|||
/// <param name="jobName">Name of the dynamic job to unregister.</param>
|
|||
/// <returns>True if the handler was found and removed; false otherwise.</returns>
|
|||
bool UnregisterHandler(string jobName); |
|||
|
|||
/// <summary>
|
|||
/// Checks whether a dynamic handler is registered for the given job name.
|
|||
/// </summary>
|
|||
/// <param name="jobName">Name of the dynamic job.</param>
|
|||
/// <returns>True if registered; false otherwise.</returns>
|
|||
bool IsHandlerRegistered(string jobName); |
|||
} |
|||
@ -0,0 +1,51 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.ExceptionHandling; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers.Hangfire; |
|||
|
|||
public class HangfireDynamicBackgroundWorkerAdapter : ITransientDependency |
|||
{ |
|||
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; } |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
public ILogger<HangfireDynamicBackgroundWorkerAdapter> Logger { get; set; } |
|||
|
|||
public HangfireDynamicBackgroundWorkerAdapter( |
|||
IDynamicBackgroundWorkerHandlerRegistry handlerRegistry, |
|||
IServiceProvider serviceProvider) |
|||
{ |
|||
HandlerRegistry = handlerRegistry; |
|||
ServiceProvider = serviceProvider; |
|||
Logger = NullLogger<HangfireDynamicBackgroundWorkerAdapter>.Instance; |
|||
} |
|||
|
|||
public virtual async Task DoWorkAsync(string workerName, CancellationToken cancellationToken = default) |
|||
{ |
|||
var handler = HandlerRegistry.Get(workerName); |
|||
if (handler == null) |
|||
{ |
|||
Logger.LogWarning("No handler registered for dynamic worker: {WorkerName}", workerName); |
|||
return; |
|||
} |
|||
|
|||
try |
|||
{ |
|||
await handler(new DynamicBackgroundWorkerExecutionContext(workerName, ServiceProvider), cancellationToken); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
// Swallow the exception to match the behavior of AsyncPeriodicBackgroundWorkerBase,
|
|||
// which catches, notifies and logs without rethrowing. This prevents Hangfire from
|
|||
// treating a single failed execution as a job failure and triggering retries.
|
|||
await ServiceProvider.GetRequiredService<IExceptionNotifier>() |
|||
.NotifyAsync(new ExceptionNotificationContext(ex)); |
|||
|
|||
Logger.LogException(ex); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,183 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Hangfire; |
|||
using Hangfire.Storage; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Hangfire; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers.Hangfire; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class HangfireDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerManager, ISingletonDependency |
|||
{ |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; } |
|||
public ILogger<HangfireDynamicBackgroundWorkerManager> Logger { get; set; } |
|||
|
|||
public HangfireDynamicBackgroundWorkerManager( |
|||
IServiceProvider serviceProvider, |
|||
IDynamicBackgroundWorkerHandlerRegistry handlerRegistry) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
HandlerRegistry = handlerRegistry; |
|||
Logger = NullLogger<HangfireDynamicBackgroundWorkerManager>.Instance; |
|||
} |
|||
|
|||
public virtual Task AddAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
DynamicBackgroundWorkerHandler handler, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
Check.NotNull(handler, nameof(handler)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
var cronExpression = schedule.CronExpression; |
|||
if (cronExpression.IsNullOrWhiteSpace()) |
|||
{ |
|||
var period = schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod; |
|||
cronExpression = GetCron(period); |
|||
} |
|||
|
|||
// Register the handler first so it is available the moment the recurring job fires.
|
|||
HandlerRegistry.Register(workerName, handler); |
|||
try |
|||
{ |
|||
ScheduleRecurringJob(workerName, cronExpression, cancellationToken); |
|||
} |
|||
catch |
|||
{ |
|||
HandlerRegistry.Unregister(workerName); |
|||
throw; |
|||
} |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
public virtual Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
|
|||
// Always remove the persistent recurring job regardless of in-memory registry state.
|
|||
// This ensures cleanup works correctly after an application restart, when the registry
|
|||
// is empty but the Hangfire recurring job may still exist in the database.
|
|||
var recurringJobId = $"DynamicWorker:{workerName}"; |
|||
RecurringJob.RemoveIfExists(recurringJobId); |
|||
var wasRegistered = HandlerRegistry.Unregister(workerName); |
|||
|
|||
return Task.FromResult(wasRegistered); |
|||
} |
|||
|
|||
public virtual Task<bool> UpdateScheduleAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
var cronExpression = schedule.CronExpression; |
|||
if (cronExpression.IsNullOrWhiteSpace()) |
|||
{ |
|||
var period = schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod; |
|||
cronExpression = GetCron(period); |
|||
} |
|||
|
|||
// Always update the persistent recurring job regardless of in-memory registry state.
|
|||
// This ensures UpdateScheduleAsync works correctly after an application restart,
|
|||
// when the registry is empty but the Hangfire recurring job may still exist in the database.
|
|||
ScheduleRecurringJob(workerName, cronExpression, cancellationToken); |
|||
|
|||
return Task.FromResult(true); |
|||
} |
|||
|
|||
public virtual bool IsRegistered(string workerName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
return HandlerRegistry.IsRegistered(workerName); |
|||
} |
|||
|
|||
public virtual Task StopAllAsync(CancellationToken cancellationToken = default) |
|||
{ |
|||
HandlerRegistry.Clear(); |
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
protected virtual void ScheduleRecurringJob(string workerName, string cronExpression, CancellationToken cancellationToken) |
|||
{ |
|||
var abpHangfireOptions = ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value; |
|||
var queueName = abpHangfireOptions.DefaultQueue; |
|||
var recurringJobId = $"DynamicWorker:{workerName}"; |
|||
|
|||
if (!JobStorage.Current.HasFeature(JobStorageFeatures.JobQueueProperty)) |
|||
{ |
|||
Logger.LogWarning( |
|||
"Current storage doesn't support specifying queues ({QueueName}) directly for a specific job. Please use the QueueAttribute instead.", |
|||
queueName); |
|||
|
|||
RecurringJob.AddOrUpdate<HangfireDynamicBackgroundWorkerAdapter>( |
|||
recurringJobId, |
|||
adapter => adapter.DoWorkAsync(workerName, CancellationToken.None), |
|||
cronExpression, |
|||
new RecurringJobOptions |
|||
{ |
|||
TimeZone = TimeZoneInfo.Utc |
|||
}); |
|||
} |
|||
else |
|||
{ |
|||
RecurringJob.AddOrUpdate<HangfireDynamicBackgroundWorkerAdapter>( |
|||
recurringJobId, |
|||
queueName, |
|||
adapter => adapter.DoWorkAsync(workerName, CancellationToken.None), |
|||
cronExpression, |
|||
new RecurringJobOptions |
|||
{ |
|||
TimeZone = TimeZoneInfo.Utc |
|||
}); |
|||
} |
|||
} |
|||
|
|||
protected virtual string GetCron(int period) |
|||
{ |
|||
var time = TimeSpan.FromMilliseconds(period); |
|||
string cron; |
|||
|
|||
if (time.TotalSeconds <= 59) |
|||
{ |
|||
var seconds = Math.Max(1, (int)Math.Round(time.TotalSeconds)); |
|||
cron = $"*/{seconds} * * * * *"; |
|||
} |
|||
else if (time.TotalMinutes <= 59) |
|||
{ |
|||
var minutes = Math.Max(1, (int)Math.Round(time.TotalMinutes)); |
|||
cron = $"*/{minutes} * * * *"; |
|||
} |
|||
else if (time.TotalHours <= 23) |
|||
{ |
|||
var hours = Math.Max(1, (int)Math.Round(time.TotalHours)); |
|||
cron = $"0 */{hours} * * *"; |
|||
} |
|||
else if (time.TotalDays <= 31) |
|||
{ |
|||
var days = Math.Max(1, (int)Math.Round(time.TotalDays)); |
|||
cron = $"0 0 0 1/{days} * *"; |
|||
} |
|||
else |
|||
{ |
|||
throw new AbpException($"Cannot convert period: {period} to cron expression."); |
|||
} |
|||
|
|||
return cron; |
|||
} |
|||
} |
|||
@ -0,0 +1,60 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Quartz; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.ExceptionHandling; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers.Quartz; |
|||
|
|||
public class QuartzDynamicBackgroundWorkerAdapter : IJob, ITransientDependency |
|||
{ |
|||
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; } |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
public ILogger<QuartzDynamicBackgroundWorkerAdapter> Logger { get; set; } |
|||
|
|||
public QuartzDynamicBackgroundWorkerAdapter( |
|||
IDynamicBackgroundWorkerHandlerRegistry handlerRegistry, |
|||
IServiceProvider serviceProvider) |
|||
{ |
|||
HandlerRegistry = handlerRegistry; |
|||
ServiceProvider = serviceProvider; |
|||
Logger = NullLogger<QuartzDynamicBackgroundWorkerAdapter>.Instance; |
|||
} |
|||
|
|||
public virtual async Task Execute(IJobExecutionContext context) |
|||
{ |
|||
var rawWorkerName = context.MergedJobDataMap.GetString(QuartzDynamicBackgroundWorkerManager.DynamicWorkerNameKey); |
|||
if (string.IsNullOrWhiteSpace(rawWorkerName)) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
var workerName = rawWorkerName!; |
|||
var handler = HandlerRegistry.Get(workerName); |
|||
if (handler == null) |
|||
{ |
|||
Logger.LogWarning("No handler registered for dynamic worker: {WorkerName}", workerName); |
|||
return; |
|||
} |
|||
|
|||
try |
|||
{ |
|||
await handler( |
|||
new DynamicBackgroundWorkerExecutionContext(workerName, ServiceProvider), |
|||
context.CancellationToken); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
// Swallow the exception to match the behavior of AsyncPeriodicBackgroundWorkerBase,
|
|||
// which catches, notifies and logs without rethrowing. This prevents Quartz from
|
|||
// treating a single failed execution as a job failure and triggering retries.
|
|||
await ServiceProvider.GetRequiredService<IExceptionNotifier>() |
|||
.NotifyAsync(new ExceptionNotificationContext(ex)); |
|||
|
|||
Logger.LogException(ex); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,142 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Quartz; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers.Quartz; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class QuartzDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerManager, ISingletonDependency |
|||
{ |
|||
public const string DynamicWorkerNameKey = "AbpDynamicWorkerName"; |
|||
|
|||
protected IScheduler Scheduler { get; } |
|||
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; } |
|||
public ILogger<QuartzDynamicBackgroundWorkerManager> Logger { get; set; } |
|||
|
|||
public QuartzDynamicBackgroundWorkerManager( |
|||
IScheduler scheduler, |
|||
IDynamicBackgroundWorkerHandlerRegistry handlerRegistry) |
|||
{ |
|||
Scheduler = scheduler; |
|||
HandlerRegistry = handlerRegistry; |
|||
Logger = NullLogger<QuartzDynamicBackgroundWorkerManager>.Instance; |
|||
} |
|||
|
|||
public virtual async Task AddAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
DynamicBackgroundWorkerHandler handler, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
Check.NotNull(handler, nameof(handler)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
var jobKey = new JobKey($"DynamicWorker:{workerName}"); |
|||
var triggerKey = new TriggerKey($"DynamicWorker:{workerName}"); |
|||
var jobDetail = JobBuilder.Create<QuartzDynamicBackgroundWorkerAdapter>() |
|||
.WithIdentity(jobKey) |
|||
.UsingJobData(DynamicWorkerNameKey, workerName) |
|||
.Build(); |
|||
|
|||
var trigger = BuildTrigger(schedule, jobDetail, triggerKey); |
|||
|
|||
// Register the handler first so it is available the moment the job fires.
|
|||
HandlerRegistry.Register(workerName, handler); |
|||
try |
|||
{ |
|||
// Use replace=true to avoid TOCTOU race between CheckExists and ScheduleJob.
|
|||
await Scheduler.ScheduleJobs( |
|||
new Dictionary<IJobDetail, IReadOnlyCollection<ITrigger>> |
|||
{ |
|||
{ jobDetail, new[] { trigger } } |
|||
}, |
|||
replace: true, |
|||
cancellationToken); |
|||
} |
|||
catch |
|||
{ |
|||
HandlerRegistry.Unregister(workerName); |
|||
throw; |
|||
} |
|||
} |
|||
|
|||
public virtual async Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
|
|||
// Always delete the persistent Quartz job regardless of in-memory registry state.
|
|||
// This ensures cleanup works correctly after an application restart, when the registry
|
|||
// is empty but the Quartz job may still exist in the scheduler store.
|
|||
var jobKey = new JobKey($"DynamicWorker:{workerName}"); |
|||
var deleted = await Scheduler.DeleteJob(jobKey, cancellationToken); |
|||
var wasRegistered = HandlerRegistry.Unregister(workerName); |
|||
|
|||
return deleted || wasRegistered; |
|||
} |
|||
|
|||
public virtual async Task<bool> UpdateScheduleAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
var triggerKey = new TriggerKey($"DynamicWorker:{workerName}"); |
|||
var jobKey = new JobKey($"DynamicWorker:{workerName}"); |
|||
var jobDetail = JobBuilder.Create<QuartzDynamicBackgroundWorkerAdapter>() |
|||
.WithIdentity(jobKey) |
|||
.UsingJobData(DynamicWorkerNameKey, workerName) |
|||
.Build(); |
|||
|
|||
var trigger = BuildTrigger(schedule, jobDetail, triggerKey); |
|||
|
|||
// Always attempt to reschedule the persistent job regardless of in-memory registry state.
|
|||
// This ensures UpdateScheduleAsync works correctly after an application restart,
|
|||
// when the registry is empty but the Quartz job may still exist in the scheduler store.
|
|||
// RescheduleJob returns null if the trigger was not found, indicating the job did not exist.
|
|||
var result = await Scheduler.RescheduleJob(triggerKey, trigger, cancellationToken); |
|||
return result != null; |
|||
} |
|||
|
|||
public virtual bool IsRegistered(string workerName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
return HandlerRegistry.IsRegistered(workerName); |
|||
} |
|||
|
|||
public virtual Task StopAllAsync(CancellationToken cancellationToken = default) |
|||
{ |
|||
HandlerRegistry.Clear(); |
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
protected virtual ITrigger BuildTrigger(DynamicBackgroundWorkerSchedule schedule, IJobDetail jobDetail, TriggerKey triggerKey) |
|||
{ |
|||
var triggerBuilder = TriggerBuilder.Create() |
|||
.ForJob(jobDetail) |
|||
.WithIdentity(triggerKey); |
|||
|
|||
if (!schedule.CronExpression.IsNullOrWhiteSpace()) |
|||
{ |
|||
triggerBuilder.WithCronSchedule(schedule.CronExpression); |
|||
} |
|||
else |
|||
{ |
|||
triggerBuilder.WithSimpleSchedule(builder => |
|||
builder.WithInterval(TimeSpan.FromMilliseconds(schedule.Period!.Value)).RepeatForever()); |
|||
} |
|||
|
|||
return triggerBuilder.Build(); |
|||
} |
|||
} |
|||
@ -0,0 +1,49 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers.TickerQ; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerManager, ISingletonDependency |
|||
{ |
|||
public virtual Task AddAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
DynamicBackgroundWorkerHandler handler, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
throw new AbpException( |
|||
"TickerQ does not support dynamic background worker registration at runtime. " + |
|||
"TickerQ uses FrozenDictionary for function registration, which requires all functions to be registered before the application starts. " + |
|||
"Please use Hangfire or Quartz provider for dynamic background workers."); |
|||
} |
|||
|
|||
public virtual Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default) |
|||
{ |
|||
throw new AbpException( |
|||
"TickerQ does not support dynamic background worker registration at runtime. " + |
|||
"Please use Hangfire or Quartz provider for dynamic background workers."); |
|||
} |
|||
|
|||
public virtual Task<bool> UpdateScheduleAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
throw new AbpException( |
|||
"TickerQ does not support dynamic background worker registration at runtime. " + |
|||
"Please use Hangfire or Quartz provider for dynamic background workers."); |
|||
} |
|||
|
|||
public virtual bool IsRegistered(string workerName) |
|||
{ |
|||
// TickerQ does not support runtime registration, so there are never any registered workers.
|
|||
return false; |
|||
} |
|||
|
|||
public virtual Task StopAllAsync(CancellationToken cancellationToken = default) |
|||
{ |
|||
return Task.CompletedTask; |
|||
} |
|||
} |
|||
@ -0,0 +1,193 @@ |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Threading; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerManager, ISingletonDependency |
|||
{ |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
public ILogger<DefaultDynamicBackgroundWorkerManager> Logger { get; set; } |
|||
|
|||
private readonly ConcurrentDictionary<string, InMemoryDynamicBackgroundWorker> _dynamicWorkers; |
|||
private readonly SemaphoreSlim _semaphore; |
|||
private volatile bool _isDisposed; |
|||
|
|||
public DefaultDynamicBackgroundWorkerManager(IServiceProvider serviceProvider) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
Logger = NullLogger<DefaultDynamicBackgroundWorkerManager>.Instance; |
|||
_dynamicWorkers = new ConcurrentDictionary<string, InMemoryDynamicBackgroundWorker>(); |
|||
_semaphore = new SemaphoreSlim(1, 1); |
|||
} |
|||
|
|||
public virtual async Task AddAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
DynamicBackgroundWorkerHandler handler, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
Check.NotNull(handler, nameof(handler)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
if (schedule.Period == null) |
|||
{ |
|||
throw new AbpException( |
|||
$"The default in-memory background worker manager does not support CronExpression without Period for dynamic worker '{workerName}'. " + |
|||
"Please set Period, or use a scheduler-backed provider (Hangfire, Quartz, TickerQ)."); |
|||
} |
|||
|
|||
await _semaphore.WaitAsync(cancellationToken); |
|||
try |
|||
{ |
|||
if (_isDisposed) |
|||
{ |
|||
throw new ObjectDisposedException(nameof(DefaultDynamicBackgroundWorkerManager)); |
|||
} |
|||
|
|||
if (_dynamicWorkers.TryRemove(workerName, out var existingWorker)) |
|||
{ |
|||
await existingWorker.StopAsync(cancellationToken); |
|||
Logger.LogInformation("Replaced existing dynamic worker: {WorkerName}", workerName); |
|||
} |
|||
|
|||
var worker = CreateDynamicWorker(workerName, schedule, handler); |
|||
_dynamicWorkers[workerName] = worker; |
|||
|
|||
await worker.StartAsync(cancellationToken); |
|||
} |
|||
finally |
|||
{ |
|||
_semaphore.Release(); |
|||
} |
|||
} |
|||
|
|||
public virtual async Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
|
|||
await _semaphore.WaitAsync(cancellationToken); |
|||
try |
|||
{ |
|||
if (!_dynamicWorkers.TryRemove(workerName, out var worker)) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
await worker.StopAsync(cancellationToken); |
|||
return true; |
|||
} |
|||
finally |
|||
{ |
|||
_semaphore.Release(); |
|||
} |
|||
} |
|||
|
|||
public virtual async Task<bool> UpdateScheduleAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
|
|||
schedule.Validate(); |
|||
|
|||
if (schedule.Period == null) |
|||
{ |
|||
throw new AbpException( |
|||
$"The default in-memory background worker manager does not support CronExpression without Period for dynamic worker '{workerName}'. " + |
|||
"Please set Period, or use a scheduler-backed provider (Hangfire, Quartz, TickerQ)."); |
|||
} |
|||
|
|||
await _semaphore.WaitAsync(cancellationToken); |
|||
try |
|||
{ |
|||
if (_isDisposed) |
|||
{ |
|||
throw new ObjectDisposedException(nameof(DefaultDynamicBackgroundWorkerManager)); |
|||
} |
|||
|
|||
if (!_dynamicWorkers.TryGetValue(workerName, out var worker)) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
worker.UpdateSchedule(schedule); |
|||
return true; |
|||
} |
|||
finally |
|||
{ |
|||
_semaphore.Release(); |
|||
} |
|||
} |
|||
|
|||
public virtual bool IsRegistered(string workerName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
return _dynamicWorkers.ContainsKey(workerName); |
|||
} |
|||
|
|||
public virtual async Task StopAllAsync(CancellationToken cancellationToken = default) |
|||
{ |
|||
if (_isDisposed) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
await _semaphore.WaitAsync(cancellationToken); |
|||
try |
|||
{ |
|||
if (_isDisposed) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
_isDisposed = true; |
|||
|
|||
foreach (var kvp in _dynamicWorkers) |
|||
{ |
|||
try |
|||
{ |
|||
await kvp.Value.StopAsync(cancellationToken); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
Logger.LogException(ex); |
|||
} |
|||
} |
|||
|
|||
_dynamicWorkers.Clear(); |
|||
} |
|||
finally |
|||
{ |
|||
_semaphore.Release(); |
|||
} |
|||
} |
|||
|
|||
protected virtual InMemoryDynamicBackgroundWorker CreateDynamicWorker( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
DynamicBackgroundWorkerHandler handler) |
|||
{ |
|||
var timer = ServiceProvider.GetRequiredService<AbpAsyncTimer>(); |
|||
var serviceScopeFactory = ServiceProvider.GetRequiredService<IServiceScopeFactory>(); |
|||
|
|||
var worker = new InMemoryDynamicBackgroundWorker( |
|||
workerName, schedule, handler, timer, serviceScopeFactory); |
|||
|
|||
worker.ServiceProvider = ServiceProvider; |
|||
worker.LazyServiceProvider = ServiceProvider.GetRequiredService<IAbpLazyServiceProvider>(); |
|||
|
|||
return worker; |
|||
} |
|||
} |
|||
@ -0,0 +1,16 @@ |
|||
using System; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public class DynamicBackgroundWorkerExecutionContext |
|||
{ |
|||
public string WorkerName { get; } |
|||
|
|||
public IServiceProvider ServiceProvider { get; } |
|||
|
|||
public DynamicBackgroundWorkerExecutionContext(string workerName, IServiceProvider serviceProvider) |
|||
{ |
|||
WorkerName = Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
ServiceProvider = Check.NotNull(serviceProvider, nameof(serviceProvider)); |
|||
} |
|||
} |
|||
@ -0,0 +1,6 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public delegate Task DynamicBackgroundWorkerHandler(DynamicBackgroundWorkerExecutionContext context, CancellationToken cancellationToken); |
|||
@ -0,0 +1,52 @@ |
|||
using System.Collections.Concurrent; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public class DynamicBackgroundWorkerHandlerRegistry : IDynamicBackgroundWorkerHandlerRegistry, ISingletonDependency |
|||
{ |
|||
protected ConcurrentDictionary<string, DynamicBackgroundWorkerHandler> Handlers { get; } |
|||
|
|||
public DynamicBackgroundWorkerHandlerRegistry() |
|||
{ |
|||
Handlers = new ConcurrentDictionary<string, DynamicBackgroundWorkerHandler>(); |
|||
} |
|||
|
|||
public virtual void Register(string workerName, DynamicBackgroundWorkerHandler handler) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(handler, nameof(handler)); |
|||
|
|||
Handlers[workerName] = handler; |
|||
} |
|||
|
|||
public virtual bool Unregister(string workerName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
return Handlers.TryRemove(workerName, out _); |
|||
} |
|||
|
|||
public virtual bool IsRegistered(string workerName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
return Handlers.ContainsKey(workerName); |
|||
} |
|||
|
|||
public virtual DynamicBackgroundWorkerHandler? Get(string workerName) |
|||
{ |
|||
Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
return Handlers.TryGetValue(workerName, out var handler) ? handler : null; |
|||
} |
|||
|
|||
public virtual IReadOnlyCollection<string> GetAllNames() |
|||
{ |
|||
return Handlers.Keys.ToList().AsReadOnly(); |
|||
} |
|||
|
|||
public virtual void Clear() |
|||
{ |
|||
Handlers.Clear(); |
|||
} |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public static class DynamicBackgroundWorkerManagerExtensions |
|||
{ |
|||
/// <summary>
|
|||
/// Adds a dynamic worker with the default schedule (<see cref="DynamicBackgroundWorkerSchedule.DefaultPeriod"/>).
|
|||
/// </summary>
|
|||
public static Task AddAsync( |
|||
this IDynamicBackgroundWorkerManager manager, |
|||
string workerName, |
|||
DynamicBackgroundWorkerHandler handler, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
return manager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule |
|||
{ |
|||
Period = DynamicBackgroundWorkerSchedule.DefaultPeriod |
|||
}, |
|||
handler, |
|||
cancellationToken); |
|||
} |
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
using System; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public class DynamicBackgroundWorkerSchedule |
|||
{ |
|||
public const int DefaultPeriod = 60000; |
|||
|
|||
public int? Period { get; set; } |
|||
|
|||
public string? CronExpression { get; set; } |
|||
|
|||
public virtual void Validate() |
|||
{ |
|||
if (Period.HasValue && Period.Value <= 0) |
|||
{ |
|||
throw new ArgumentException( |
|||
$"Period must be greater than 0 when provided. Given value: {Period.Value}.", |
|||
nameof(Period)); |
|||
} |
|||
|
|||
if (Period == null && string.IsNullOrWhiteSpace(CronExpression)) |
|||
{ |
|||
throw new ArgumentException( |
|||
"At least one of 'Period' or 'CronExpression' must be set."); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,18 @@ |
|||
using System.Collections.Generic; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public interface IDynamicBackgroundWorkerHandlerRegistry |
|||
{ |
|||
void Register(string workerName, DynamicBackgroundWorkerHandler handler); |
|||
|
|||
bool Unregister(string workerName); |
|||
|
|||
bool IsRegistered(string workerName); |
|||
|
|||
DynamicBackgroundWorkerHandler? Get(string workerName); |
|||
|
|||
IReadOnlyCollection<string> GetAllNames(); |
|||
|
|||
void Clear(); |
|||
} |
|||
@ -0,0 +1,52 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
/// <summary>
|
|||
/// Manages dynamic background workers that are registered at runtime
|
|||
/// without requiring a strongly-typed worker class.
|
|||
/// </summary>
|
|||
public interface IDynamicBackgroundWorkerManager |
|||
{ |
|||
/// <summary>
|
|||
/// Adds a dynamic worker by name, schedule and handler.
|
|||
/// If a worker with the same name already exists, it will be replaced.
|
|||
/// </summary>
|
|||
Task AddAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
DynamicBackgroundWorkerHandler handler, |
|||
CancellationToken cancellationToken = default); |
|||
|
|||
/// <summary>
|
|||
/// Removes a previously added dynamic worker by name.
|
|||
/// Always attempts to remove both the in-memory handler and any persistent scheduling record
|
|||
/// (Hangfire RecurringJob or Quartz job), regardless of current in-memory state.
|
|||
/// Returns true if the handler was registered in memory at the time of the call, or if
|
|||
/// the persistent scheduling record was found and deleted (provider-dependent).
|
|||
/// May return false after an application restart even if a persistent record was cleaned up,
|
|||
/// when the provider cannot report whether a persistent record existed (e.g. Hangfire).
|
|||
/// </summary>
|
|||
Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default); |
|||
|
|||
/// <summary>
|
|||
/// Updates the schedule of a previously added dynamic worker.
|
|||
/// Returns true if the worker was found and updated; false otherwise.
|
|||
/// </summary>
|
|||
Task<bool> UpdateScheduleAsync( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
CancellationToken cancellationToken = default); |
|||
|
|||
/// <summary>
|
|||
/// Checks whether a dynamic worker with the given name is registered.
|
|||
/// </summary>
|
|||
bool IsRegistered(string workerName); |
|||
|
|||
/// <summary>
|
|||
/// Stops all dynamic workers and releases resources.
|
|||
/// Called during application shutdown.
|
|||
/// </summary>
|
|||
Task StopAllAsync(CancellationToken cancellationToken = default); |
|||
} |
|||
@ -0,0 +1,51 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Volo.Abp.Threading; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public class InMemoryDynamicBackgroundWorker : AsyncPeriodicBackgroundWorkerBase |
|||
{ |
|||
public string WorkerName { get; } |
|||
|
|||
private readonly DynamicBackgroundWorkerHandler _handler; |
|||
|
|||
public InMemoryDynamicBackgroundWorker( |
|||
string workerName, |
|||
DynamicBackgroundWorkerSchedule schedule, |
|||
DynamicBackgroundWorkerHandler handler, |
|||
AbpAsyncTimer timer, |
|||
IServiceScopeFactory serviceScopeFactory) |
|||
: base(timer, serviceScopeFactory) |
|||
{ |
|||
WorkerName = Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
_handler = Check.NotNull(handler, nameof(handler)); |
|||
|
|||
Timer.Period = schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod; |
|||
CronExpression = schedule.CronExpression; |
|||
} |
|||
|
|||
public virtual void UpdateSchedule(DynamicBackgroundWorkerSchedule schedule) |
|||
{ |
|||
Check.NotNull(schedule, nameof(schedule)); |
|||
|
|||
Timer.Stop(); |
|||
Timer.Period = schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod; |
|||
CronExpression = schedule.CronExpression; |
|||
Timer.Start(StartCancellationToken); |
|||
} |
|||
|
|||
protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext) |
|||
{ |
|||
await _handler( |
|||
new DynamicBackgroundWorkerExecutionContext(WorkerName, workerContext.ServiceProvider), |
|||
workerContext.CancellationToken); |
|||
} |
|||
|
|||
public override string ToString() |
|||
{ |
|||
return $"DynamicWorker:{WorkerName}"; |
|||
} |
|||
} |
|||
@ -0,0 +1,8 @@ |
|||
using System.Collections.Concurrent; |
|||
|
|||
namespace Volo.Abp.BackgroundJobs; |
|||
|
|||
public class DynamicJobExecutionTracker |
|||
{ |
|||
public ConcurrentBag<string> ExecutedJsonData { get; } = new(); |
|||
} |
|||
@ -0,0 +1,98 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Shouldly; |
|||
using Volo.Abp.BackgroundJobs; |
|||
using Volo.Abp.BackgroundWorkers; |
|||
using Xunit; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
/// <summary>
|
|||
/// Isolated tests for <see cref="IDynamicBackgroundWorkerManager.StopAllAsync"/>.
|
|||
/// Kept in a separate class so the singleton manager is fresh per test (xUnit creates
|
|||
/// a new test class instance — and therefore a new ABP application — for each test method).
|
|||
/// </summary>
|
|||
public class DynamicBackgroundWorkerManager_StopAll_Tests : BackgroundJobsTestBase |
|||
{ |
|||
private readonly IDynamicBackgroundWorkerManager _dynamicWorkerManager; |
|||
|
|||
public DynamicBackgroundWorkerManager_StopAll_Tests() |
|||
{ |
|||
_dynamicWorkerManager = GetRequiredService<IDynamicBackgroundWorkerManager>(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Stop_All_Workers_And_Clear_Registry() |
|||
{ |
|||
var workerName1 = "stop-all-worker-1-" + Guid.NewGuid(); |
|||
var workerName2 = "stop-all-worker-2-" + Guid.NewGuid(); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName1, |
|||
new DynamicBackgroundWorkerSchedule { Period = 60000 }, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName2, |
|||
new DynamicBackgroundWorkerSchedule { Period = 60000 }, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
|
|||
_dynamicWorkerManager.IsRegistered(workerName1).ShouldBeTrue(); |
|||
_dynamicWorkerManager.IsRegistered(workerName2).ShouldBeTrue(); |
|||
|
|||
await _dynamicWorkerManager.StopAllAsync(); |
|||
|
|||
_dynamicWorkerManager.IsRegistered(workerName1).ShouldBeFalse(); |
|||
_dynamicWorkerManager.IsRegistered(workerName2).ShouldBeFalse(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Throw_ObjectDisposedException_When_AddAsync_Called_After_StopAllAsync() |
|||
{ |
|||
await _dynamicWorkerManager.StopAllAsync(); |
|||
|
|||
await Assert.ThrowsAsync<ObjectDisposedException>(() => |
|||
_dynamicWorkerManager.AddAsync( |
|||
"post-stop-worker-" + Guid.NewGuid(), |
|||
new DynamicBackgroundWorkerSchedule { Period = 1000 }, |
|||
(_, _) => Task.CompletedTask |
|||
) |
|||
); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Throw_ObjectDisposedException_When_UpdateScheduleAsync_Called_After_StopAllAsync() |
|||
{ |
|||
var workerName = "update-after-stop-" + Guid.NewGuid(); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 60000 }, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
|
|||
await _dynamicWorkerManager.StopAllAsync(); |
|||
|
|||
await Assert.ThrowsAsync<ObjectDisposedException>(() => |
|||
_dynamicWorkerManager.UpdateScheduleAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 1000 } |
|||
) |
|||
); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Be_Idempotent_When_StopAllAsync_Called_Multiple_Times() |
|||
{ |
|||
await _dynamicWorkerManager.AddAsync( |
|||
"idempotent-stop-" + Guid.NewGuid(), |
|||
new DynamicBackgroundWorkerSchedule { Period = 60000 }, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
|
|||
// Should not throw when called multiple times
|
|||
await _dynamicWorkerManager.StopAllAsync(); |
|||
await _dynamicWorkerManager.StopAllAsync(); |
|||
} |
|||
} |
|||
@ -0,0 +1,387 @@ |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Shouldly; |
|||
using Volo.Abp.BackgroundJobs; |
|||
using Volo.Abp.BackgroundWorkers; |
|||
using Xunit; |
|||
|
|||
namespace Volo.Abp.BackgroundWorkers; |
|||
|
|||
public class DynamicBackgroundWorkerManager_Tests : BackgroundJobsTestBase |
|||
{ |
|||
private readonly IDynamicBackgroundWorkerManager _dynamicWorkerManager; |
|||
|
|||
public DynamicBackgroundWorkerManager_Tests() |
|||
{ |
|||
_dynamicWorkerManager = GetRequiredService<IDynamicBackgroundWorkerManager>(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Register_Dynamic_Worker() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule |
|||
{ |
|||
Period = 1000 |
|||
}, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
|
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeTrue(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Execute_Dynamic_Handler() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule |
|||
{ |
|||
Period = 50 |
|||
}, |
|||
(context, _) => |
|||
{ |
|||
if (context.WorkerName == workerName) |
|||
{ |
|||
tcs.TrySetResult(true); |
|||
} |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
); |
|||
|
|||
var completedTask = await Task.WhenAny(tcs.Task, Task.Delay(5000)); |
|||
completedTask.ShouldBe(tcs.Task); |
|||
(await tcs.Task).ShouldBeTrue(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Add_Dynamic_Worker_With_Default_Schedule() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
|
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeTrue(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Remove_Dynamic_Worker() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule |
|||
{ |
|||
Period = 1000 |
|||
}, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
|
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeTrue(); |
|||
|
|||
var result = await _dynamicWorkerManager.RemoveAsync(workerName); |
|||
result.ShouldBeTrue(); |
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Return_False_When_Removing_NonExistent_Worker() |
|||
{ |
|||
var result = await _dynamicWorkerManager.RemoveAsync("non-existent-worker-" + Guid.NewGuid()); |
|||
result.ShouldBeFalse(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Update_Dynamic_Worker_Schedule() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
var executionCount = 0; |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule |
|||
{ |
|||
Period = 60000 |
|||
}, |
|||
(_, _) => |
|||
{ |
|||
Interlocked.Increment(ref executionCount); |
|||
return Task.CompletedTask; |
|||
} |
|||
); |
|||
|
|||
var result = await _dynamicWorkerManager.UpdateScheduleAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule |
|||
{ |
|||
Period = 50 |
|||
} |
|||
); |
|||
|
|||
result.ShouldBeTrue(); |
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeTrue(); |
|||
|
|||
var timeout = TimeSpan.FromSeconds(5); |
|||
var startTime = DateTime.UtcNow; |
|||
while (executionCount == 0 && DateTime.UtcNow - startTime < timeout) |
|||
{ |
|||
await Task.Delay(50); |
|||
} |
|||
|
|||
executionCount.ShouldBeGreaterThan(0); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Return_False_When_Updating_NonExistent_Worker() |
|||
{ |
|||
var result = await _dynamicWorkerManager.UpdateScheduleAsync( |
|||
"non-existent-worker-" + Guid.NewGuid(), |
|||
new DynamicBackgroundWorkerSchedule { Period = 1000 } |
|||
); |
|||
|
|||
result.ShouldBeFalse(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Replace_Existing_Worker_When_Same_Name_Added() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
var secondHandlerTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 60000 }, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 50 }, |
|||
(_, _) => |
|||
{ |
|||
secondHandlerTcs.TrySetResult(true); |
|||
return Task.CompletedTask; |
|||
} |
|||
); |
|||
|
|||
var completedTask = await Task.WhenAny(secondHandlerTcs.Task, Task.Delay(5000)); |
|||
completedTask.ShouldBe(secondHandlerTcs.Task); |
|||
(await secondHandlerTcs.Task).ShouldBeTrue(); |
|||
|
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeTrue(); |
|||
|
|||
var removed = await _dynamicWorkerManager.RemoveAsync(workerName); |
|||
removed.ShouldBeTrue(); |
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Throw_When_Period_Is_Zero() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
|
|||
await Assert.ThrowsAsync<ArgumentException>(async () => |
|||
{ |
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 0 }, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
}); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Throw_When_Period_Is_Negative() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
|
|||
await Assert.ThrowsAsync<ArgumentException>(async () => |
|||
{ |
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = -1000 }, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
}); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Throw_When_No_Period_And_No_CronExpression() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
|
|||
await Assert.ThrowsAsync<ArgumentException>(async () => |
|||
{ |
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule(), |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
}); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Continue_Running_After_Handler_Throws_Exception() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
var callCount = 0; |
|||
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 50 }, |
|||
(_, _) => |
|||
{ |
|||
var count = Interlocked.Increment(ref callCount); |
|||
if (count == 1) |
|||
{ |
|||
throw new InvalidOperationException("Simulated failure"); |
|||
} |
|||
|
|||
tcs.TrySetResult(true); |
|||
return Task.CompletedTask; |
|||
} |
|||
); |
|||
|
|||
var completedTask = await Task.WhenAny(tcs.Task, Task.Delay(5000)); |
|||
completedTask.ShouldBe(tcs.Task); |
|||
callCount.ShouldBeGreaterThan(1); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Not_Be_Registered_After_Remove() |
|||
{ |
|||
var workerName = "dynamic-worker-" + Guid.NewGuid(); |
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse(); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 1000 }, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
|
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeTrue(); |
|||
|
|||
await _dynamicWorkerManager.RemoveAsync(workerName); |
|||
|
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Handle_Concurrent_Add_With_Same_Name() |
|||
{ |
|||
var workerName = "concurrent-worker-" + Guid.NewGuid(); |
|||
var executedHandlerIds = new ConcurrentBag<int>(); |
|||
|
|||
var tasks = Enumerable.Range(0, 10).Select(i => |
|||
_dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 60000 }, |
|||
(_, _) => |
|||
{ |
|||
executedHandlerIds.Add(i); |
|||
return Task.CompletedTask; |
|||
} |
|||
) |
|||
).ToList(); |
|||
|
|||
await Task.WhenAll(tasks); |
|||
|
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeTrue(); |
|||
|
|||
var removed = await _dynamicWorkerManager.RemoveAsync(workerName); |
|||
removed.ShouldBeTrue(); |
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Handle_Concurrent_Add_And_Remove() |
|||
{ |
|||
var workerNames = Enumerable.Range(0, 10) |
|||
.Select(i => $"concurrent-worker-{i}-" + Guid.NewGuid()) |
|||
.ToList(); |
|||
|
|||
var addTasks = workerNames.Select(name => |
|||
_dynamicWorkerManager.AddAsync( |
|||
name, |
|||
new DynamicBackgroundWorkerSchedule { Period = 60000 }, |
|||
(_, _) => Task.CompletedTask |
|||
) |
|||
).ToList(); |
|||
|
|||
await Task.WhenAll(addTasks); |
|||
|
|||
foreach (var name in workerNames) |
|||
{ |
|||
_dynamicWorkerManager.IsRegistered(name).ShouldBeTrue(); |
|||
} |
|||
|
|||
var removeTasks = workerNames.Select(name => |
|||
_dynamicWorkerManager.RemoveAsync(name) |
|||
).ToList(); |
|||
|
|||
var results = await Task.WhenAll(removeTasks); |
|||
|
|||
results.ShouldAllBe(r => r); |
|||
|
|||
foreach (var name in workerNames) |
|||
{ |
|||
_dynamicWorkerManager.IsRegistered(name).ShouldBeFalse(); |
|||
} |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_Handle_Concurrent_Add_Remove_Update() |
|||
{ |
|||
var workerName = "concurrent-mixed-" + Guid.NewGuid(); |
|||
|
|||
await _dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 60000 }, |
|||
(_, _) => Task.CompletedTask |
|||
); |
|||
|
|||
var tasks = new List<Task> |
|||
{ |
|||
_dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 30000 }, |
|||
(_, _) => Task.CompletedTask |
|||
), |
|||
_dynamicWorkerManager.UpdateScheduleAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 20000 } |
|||
), |
|||
_dynamicWorkerManager.AddAsync( |
|||
workerName, |
|||
new DynamicBackgroundWorkerSchedule { Period = 10000 }, |
|||
(_, _) => Task.CompletedTask |
|||
) |
|||
}; |
|||
|
|||
await Task.WhenAll(tasks); |
|||
|
|||
// After all concurrent operations, worker should still be in a consistent state
|
|||
var isRegistered = _dynamicWorkerManager.IsRegistered(workerName); |
|||
isRegistered.ShouldBeTrue(); |
|||
|
|||
var removed = await _dynamicWorkerManager.RemoveAsync(workerName); |
|||
removed.ShouldBeTrue(); |
|||
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse(); |
|||
} |
|||
} |
|||
Loading…
Reference in new issue