diff --git a/framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/TickerQDynamicBackgroundWorkerManager.cs b/framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/TickerQDynamicBackgroundWorkerManager.cs index f4850777dd..41209cce68 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/TickerQDynamicBackgroundWorkerManager.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/TickerQDynamicBackgroundWorkerManager.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; @@ -22,6 +23,8 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; } public ILogger Logger { get; set; } + private readonly ConcurrentDictionary _cronTickerIds; + public TickerQDynamicBackgroundWorkerManager( AbpTickerQFunctionProvider abpTickerQFunctionProvider, AbpTickerQBackgroundWorkersProvider abpTickerQBackgroundWorkersProvider, @@ -33,6 +36,7 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan CronTickerManager = cronTickerManager; HandlerRegistry = handlerRegistry; Logger = NullLogger.Instance; + _cronTickerIds = new ConcurrentDictionary(); } public virtual async Task AddAsync( @@ -47,6 +51,12 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan schedule.Validate(); + // If replacing an existing worker, delete the old cron entry first + if (_cronTickerIds.TryRemove(workerName, out var existingId)) + { + await CronTickerManager.DeleteAsync(existingId, cancellationToken); + } + var cronExpression = schedule.CronExpression ?? GetCron(schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod); var functionName = $"DynamicWorker:{workerName}"; @@ -81,22 +91,27 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan WorkerType = typeof(TickerQDynamicBackgroundWorkerManager) }; - await CronTickerManager.AddAsync(new CronTickerEntity + var result = await CronTickerManager.AddAsync(new CronTickerEntity { Function = functionName, Expression = cronExpression - }); + }, cancellationToken); + + if (result.IsSucceeded && result.Result != null) + { + _cronTickerIds[workerName] = result.Result.Id; + } HandlerRegistry.Register(workerName, handler); } - public virtual Task RemoveAsync(string workerName, CancellationToken cancellationToken = default) + public virtual async Task RemoveAsync(string workerName, CancellationToken cancellationToken = default) { Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); if (!HandlerRegistry.IsRegistered(workerName)) { - return Task.FromResult(false); + return false; } var functionName = $"DynamicWorker:{workerName}"; @@ -104,14 +119,12 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan AbpTickerQBackgroundWorkersProvider.BackgroundWorkers.Remove(functionName); HandlerRegistry.Unregister(workerName); - // ICronTickerManager does not provide a remove API, so the persisted - // cron entry will remain in storage. The handler is unregistered above, - // so the entry will find a null handler and skip execution silently. - Logger.LogWarning( - "Dynamic worker '{WorkerName}' removed from memory, but the persisted TickerQ cron entry may remain in storage.", - workerName); + if (_cronTickerIds.TryRemove(workerName, out var cronTickerId)) + { + await CronTickerManager.DeleteAsync(cronTickerId, cancellationToken); + } - return Task.FromResult(true); + return true; } public virtual async Task UpdateScheduleAsync( @@ -137,11 +150,22 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan existingWorker.CronExpression = cronExpression; } - await CronTickerManager.AddAsync(new CronTickerEntity + // Delete old entry and create new one with updated expression + if (_cronTickerIds.TryRemove(workerName, out var oldCronTickerId)) + { + await CronTickerManager.DeleteAsync(oldCronTickerId, cancellationToken); + } + + var result = await CronTickerManager.AddAsync(new CronTickerEntity { Function = functionName, Expression = cronExpression - }); + }, cancellationToken); + + if (result.IsSucceeded && result.Result != null) + { + _cronTickerIds[workerName] = result.Result.Id; + } return true; }