Browse Source

Add dynamic worker remove/update APIs

Introduce RemoveAsync and UpdateScheduleAsync to IBackgroundWorkerManager and implement them for the default in-memory manager, Hangfire, Quartz and TickerQ providers. The default BackgroundWorkerManager now tracks dynamic workers in a dictionary, supports stopping/removing workers and recreating workers with a new schedule; provider-specific managers update scheduler entries or remove recurring jobs accordingly. Documentation updated with usage examples for removing and updating schedules, and unit tests added to cover removal, schedule updates and non-existent worker behavior.
pull/25066/head
SALİH ÖZKARA 3 weeks ago
parent
commit
15202a910c
  1. 18
      docs/en/framework/infrastructure/background-workers/index.md
  2. 64
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs
  3. 51
      framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerManager.cs
  4. 44
      framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/AbpTickerQBackgroundWorkerManager.cs
  5. 65
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs
  6. 12
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/IBackgroundWorkerManager.cs
  7. 74
      framework/test/Volo.Abp.BackgroundJobs.Tests/Volo/Abp/BackgroundJobs/DynamicBackgroundWorkerManager_Tests.cs

18
docs/en/framework/infrastructure/background-workers/index.md

@ -140,12 +140,30 @@ await backgroundWorkerManager.AddAsync(
);
```
You can also **remove** a dynamic worker or **update its schedule** at runtime:
```csharp
// Remove a dynamic worker
var removed = await backgroundWorkerManager.RemoveAsync("InventorySyncWorker");
// Update the schedule of a dynamic worker
var updated = await backgroundWorkerManager.UpdateScheduleAsync(
"InventorySyncWorker",
new DynamicBackgroundWorkerSchedule
{
Period = 60000 // change to 60 seconds
}
);
```
Key points:
* `workerName` is the runtime identifier of the dynamic worker.
* The `handler` is registered at runtime and executed through the provider-specific worker manager.
* Provider behavior is preserved. For example, providers with persistent schedulers keep their own scheduling semantics.
* The default in-process manager uses in-memory periodic execution.
* `RemoveAsync` stops and removes a dynamic worker. Returns `true` if the worker was found and removed.
* `UpdateScheduleAsync` changes the schedule of an existing dynamic worker. Returns `true` if the worker was found and updated. The handler itself is not changed.
## Options

64
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs

@ -207,6 +207,70 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
return Task.CompletedTask;
}
public override Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
if (!DynamicBackgroundWorkerHandlerRegistry.IsRegistered(workerName))
{
return Task.FromResult(false);
}
var recurringJobId = $"DynamicWorker:{workerName}";
RecurringJob.RemoveIfExists(recurringJobId);
DynamicBackgroundWorkerHandlerRegistry.Unregister(workerName);
return Task.FromResult(true);
}
public override Task<bool> UpdateScheduleAsync(string workerName, DynamicBackgroundWorkerSchedule schedule, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
Check.NotNull(schedule, nameof(schedule));
if (!DynamicBackgroundWorkerHandlerRegistry.IsRegistered(workerName))
{
return Task.FromResult(false);
}
var cronExpression = schedule.CronExpression;
if (cronExpression.IsNullOrWhiteSpace())
{
var period = schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod;
cronExpression = GetCron(period);
}
var abpHangfireOptions = ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
var queueName = abpHangfireOptions.DefaultQueue;
var recurringJobId = $"DynamicWorker:{workerName}";
if (!JobStorage.Current.HasFeature(JobStorageFeatures.JobQueueProperty))
{
RecurringJob.AddOrUpdate<HangfireDynamicBackgroundWorkerAdapter>(
recurringJobId,
adapter => adapter.DoWorkAsync(workerName, cancellationToken),
cronExpression,
new RecurringJobOptions
{
TimeZone = TimeZoneInfo.Utc
});
}
else
{
RecurringJob.AddOrUpdate<HangfireDynamicBackgroundWorkerAdapter>(
recurringJobId,
queueName,
adapter => adapter.DoWorkAsync(workerName, cancellationToken),
cronExpression,
new RecurringJobOptions
{
TimeZone = TimeZoneInfo.Utc
});
}
return Task.FromResult(true);
}
private static readonly MethodInfo? GetRecurringJobIdMethodInfo = typeof(RecurringJob).GetMethod("GetRecurringJobId", BindingFlags.NonPublic | BindingFlags.Static);
protected virtual string? GetRecurringJobId(IBackgroundWorker worker, Expression<Func<Task>> methodCall)
{

51
framework/src/Volo.Abp.BackgroundWorkers.Quartz/Volo/Abp/BackgroundWorkers/Quartz/QuartzBackgroundWorkerManager.cs

@ -171,4 +171,55 @@ public class QuartzBackgroundWorkerManager : BackgroundWorkerManager, ISingleton
await Scheduler.ScheduleJob(jobDetail, trigger, cancellationToken);
}
}
public override async Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
if (!DynamicBackgroundWorkerHandlerRegistry.IsRegistered(workerName))
{
return false;
}
var jobKey = new JobKey($"DynamicWorker:{workerName}");
await Scheduler.DeleteJob(jobKey, cancellationToken);
DynamicBackgroundWorkerHandlerRegistry.Unregister(workerName);
return true;
}
public override async Task<bool> UpdateScheduleAsync(string workerName, DynamicBackgroundWorkerSchedule schedule, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
Check.NotNull(schedule, nameof(schedule));
if (!DynamicBackgroundWorkerHandlerRegistry.IsRegistered(workerName))
{
return false;
}
if (schedule.Period == null && schedule.CronExpression.IsNullOrWhiteSpace())
{
throw new AbpException($"Both 'Period' and 'CronExpression' are not set for dynamic worker {workerName}. You must set at least one of them.");
}
var triggerKey = new TriggerKey($"DynamicWorker:{workerName}");
var triggerBuilder = TriggerBuilder.Create()
.WithIdentity(triggerKey)
.ForJob(new JobKey($"DynamicWorker:{workerName}"));
if (!schedule.CronExpression.IsNullOrWhiteSpace())
{
triggerBuilder.WithCronSchedule(schedule.CronExpression);
}
else
{
triggerBuilder.WithSimpleSchedule(builder =>
builder.WithInterval(TimeSpan.FromMilliseconds(schedule.Period!.Value)).RepeatForever());
}
await Scheduler.RescheduleJob(triggerKey, triggerBuilder.Build(), cancellationToken);
return true;
}
}

44
framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/AbpTickerQBackgroundWorkerManager.cs

@ -139,6 +139,50 @@ public class AbpTickerQBackgroundWorkerManager : BackgroundWorkerManager, ISingl
});
}
public override async Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
if (!DynamicBackgroundWorkerHandlerRegistry.IsRegistered(workerName))
{
return false;
}
var functionName = $"DynamicWorker:{workerName}";
AbpTickerQFunctionProvider.Functions.Remove(functionName);
AbpTickerQBackgroundWorkersProvider.BackgroundWorkers.Remove(functionName);
DynamicBackgroundWorkerHandlerRegistry.Unregister(workerName);
return true;
}
public override async Task<bool> UpdateScheduleAsync(string workerName, DynamicBackgroundWorkerSchedule schedule, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
Check.NotNull(schedule, nameof(schedule));
if (!DynamicBackgroundWorkerHandlerRegistry.IsRegistered(workerName))
{
return false;
}
var cronExpression = schedule.CronExpression ?? GetCron(schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod);
var functionName = $"DynamicWorker:{workerName}";
if (AbpTickerQBackgroundWorkersProvider.BackgroundWorkers.TryGetValue(functionName, out var existingWorker))
{
existingWorker.CronExpression = cronExpression;
}
await CronTickerManager.AddAsync(new CronTickerEntity
{
Function = functionName,
Expression = cronExpression
});
return true;
}
protected virtual string GetCron(int period)
{
var time = TimeSpan.FromMilliseconds(period);

65
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs

@ -18,6 +18,7 @@ public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDepen
private bool _isDisposed;
private readonly List<IBackgroundWorker> _backgroundWorkers;
private readonly Dictionary<string, InMemoryDynamicBackgroundWorker> _dynamicWorkers;
protected IServiceProvider ServiceProvider { get; }
protected IDynamicBackgroundWorkerHandlerRegistry DynamicBackgroundWorkerHandlerRegistry { get; }
@ -29,6 +30,7 @@ public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDepen
IDynamicBackgroundWorkerHandlerRegistry dynamicBackgroundWorkerHandlerRegistry)
{
_backgroundWorkers = new List<IBackgroundWorker>();
_dynamicWorkers = new Dictionary<string, InMemoryDynamicBackgroundWorker>();
ServiceProvider = serviceProvider;
DynamicBackgroundWorkerHandlerRegistry = dynamicBackgroundWorkerHandlerRegistry;
}
@ -88,9 +90,72 @@ public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDepen
worker.ServiceProvider = ServiceProvider;
worker.LazyServiceProvider = ServiceProvider.GetRequiredService<IAbpLazyServiceProvider>();
_dynamicWorkers[workerName] = worker;
await AddAsync(worker, cancellationToken);
}
public virtual async Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
if (!_dynamicWorkers.TryGetValue(workerName, out var worker))
{
return false;
}
await worker.StopAsync(cancellationToken);
_backgroundWorkers.Remove(worker);
_dynamicWorkers.Remove(workerName);
DynamicBackgroundWorkerHandlerRegistry.Unregister(workerName);
return true;
}
public virtual async Task<bool> UpdateScheduleAsync(string workerName, DynamicBackgroundWorkerSchedule schedule, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
Check.NotNull(schedule, nameof(schedule));
if (!_dynamicWorkers.TryGetValue(workerName, out var oldWorker))
{
return false;
}
if (schedule.Period == null && !string.IsNullOrWhiteSpace(schedule.CronExpression))
{
throw new AbpException("Default background worker manager does not support cron expression without period.");
}
var handler = DynamicBackgroundWorkerHandlerRegistry.Get(workerName);
if (handler == null)
{
return false;
}
await oldWorker.StopAsync(cancellationToken);
_backgroundWorkers.Remove(oldWorker);
_dynamicWorkers.Remove(workerName);
var timer = ServiceProvider.GetRequiredService<AbpAsyncTimer>();
var serviceScopeFactory = ServiceProvider.GetRequiredService<IServiceScopeFactory>();
var newWorker = new InMemoryDynamicBackgroundWorker(
workerName,
schedule,
timer,
serviceScopeFactory,
DynamicBackgroundWorkerHandlerRegistry
);
newWorker.ServiceProvider = ServiceProvider;
newWorker.LazyServiceProvider = ServiceProvider.GetRequiredService<IAbpLazyServiceProvider>();
_dynamicWorkers[workerName] = newWorker;
await AddAsync(newWorker, cancellationToken);
return true;
}
public virtual void Dispose()
{
if (_isDisposed)

12
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/IBackgroundWorkerManager.cs

@ -35,4 +35,16 @@ public interface IBackgroundWorkerManager : IRunnable
DynamicBackgroundWorkerSchedule schedule,
Func<DynamicBackgroundWorkerExecutionContext, CancellationToken, Task> handler,
CancellationToken cancellationToken = default);
/// <summary>
/// Removes a previously added dynamic worker by name.
/// Returns true if the worker was found and removed; false otherwise.
/// </summary>
Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default);
/// <summary>
/// Updates the schedule of a previously added dynamic worker.
/// Returns true if the worker was found and updated; false otherwise.
/// </summary>
Task<bool> UpdateScheduleAsync(string workerName, DynamicBackgroundWorkerSchedule schedule, CancellationToken cancellationToken = default);
}

74
framework/test/Volo.Abp.BackgroundJobs.Tests/Volo/Abp/BackgroundJobs/DynamicBackgroundWorkerManager_Tests.cs

@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Volo.Abp.BackgroundWorkers;
@ -61,4 +62,77 @@ public class DynamicBackgroundWorkerManager_Tests : BackgroundJobsTestBase
completedTask.ShouldBe(tcs.Task);
(await tcs.Task).ShouldBeTrue();
}
[Fact]
public async Task Should_Remove_Dynamic_Worker()
{
var workerName = "dynamic-worker-" + Guid.NewGuid();
await _backgroundWorkerManager.AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule
{
Period = 1000
},
(_, _) => Task.CompletedTask
);
_handlerRegistry.IsRegistered(workerName).ShouldBeTrue();
var result = await _backgroundWorkerManager.RemoveAsync(workerName);
result.ShouldBeTrue();
_handlerRegistry.IsRegistered(workerName).ShouldBeFalse();
}
[Fact]
public async Task Should_Return_False_When_Removing_NonExistent_Worker()
{
var result = await _backgroundWorkerManager.RemoveAsync("non-existent-worker-" + Guid.NewGuid());
result.ShouldBeFalse();
}
[Fact]
public async Task Should_Update_Dynamic_Worker_Schedule()
{
var workerName = "dynamic-worker-" + Guid.NewGuid();
var executionCount = 0;
await _backgroundWorkerManager.AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule
{
Period = 60000
},
(_, _) =>
{
Interlocked.Increment(ref executionCount);
return Task.CompletedTask;
}
);
var result = await _backgroundWorkerManager.UpdateScheduleAsync(
workerName,
new DynamicBackgroundWorkerSchedule
{
Period = 50
}
);
result.ShouldBeTrue();
_handlerRegistry.IsRegistered(workerName).ShouldBeTrue();
await Task.Delay(500);
executionCount.ShouldBeGreaterThan(0);
}
[Fact]
public async Task Should_Return_False_When_Updating_NonExistent_Worker()
{
var result = await _backgroundWorkerManager.UpdateScheduleAsync(
"non-existent-worker-" + Guid.NewGuid(),
new DynamicBackgroundWorkerSchedule { Period = 1000 }
);
result.ShouldBeFalse();
}
}

Loading…
Cancel
Save