Browse Source

fix: Enhance dynamic background worker management by improving cron entry handling

pull/25066/head
maliming 1 week ago
parent
commit
fa19685a5b
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 50
      framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/TickerQDynamicBackgroundWorkerManager.cs

50
framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/TickerQDynamicBackgroundWorkerManager.cs

@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
@ -22,6 +23,8 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
protected IDynamicBackgroundWorkerHandlerRegistry HandlerRegistry { get; }
public ILogger<TickerQDynamicBackgroundWorkerManager> Logger { get; set; }
private readonly ConcurrentDictionary<string, Guid> _cronTickerIds;
public TickerQDynamicBackgroundWorkerManager(
AbpTickerQFunctionProvider abpTickerQFunctionProvider,
AbpTickerQBackgroundWorkersProvider abpTickerQBackgroundWorkersProvider,
@ -33,6 +36,7 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
CronTickerManager = cronTickerManager;
HandlerRegistry = handlerRegistry;
Logger = NullLogger<TickerQDynamicBackgroundWorkerManager>.Instance;
_cronTickerIds = new ConcurrentDictionary<string, Guid>();
}
public virtual async Task AddAsync(
@ -47,6 +51,12 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
schedule.Validate();
// If replacing an existing worker, delete the old cron entry first
if (_cronTickerIds.TryRemove(workerName, out var existingId))
{
await CronTickerManager.DeleteAsync(existingId, cancellationToken);
}
var cronExpression = schedule.CronExpression ?? GetCron(schedule.Period ?? DynamicBackgroundWorkerSchedule.DefaultPeriod);
var functionName = $"DynamicWorker:{workerName}";
@ -81,22 +91,27 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
WorkerType = typeof(TickerQDynamicBackgroundWorkerManager)
};
await CronTickerManager.AddAsync(new CronTickerEntity
var result = await CronTickerManager.AddAsync(new CronTickerEntity
{
Function = functionName,
Expression = cronExpression
});
}, cancellationToken);
if (result.IsSucceeded && result.Result != null)
{
_cronTickerIds[workerName] = result.Result.Id;
}
HandlerRegistry.Register(workerName, handler);
}
public virtual Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default)
public virtual async Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
if (!HandlerRegistry.IsRegistered(workerName))
{
return Task.FromResult(false);
return false;
}
var functionName = $"DynamicWorker:{workerName}";
@ -104,14 +119,12 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
AbpTickerQBackgroundWorkersProvider.BackgroundWorkers.Remove(functionName);
HandlerRegistry.Unregister(workerName);
// ICronTickerManager<T> does not provide a remove API, so the persisted
// cron entry will remain in storage. The handler is unregistered above,
// so the entry will find a null handler and skip execution silently.
Logger.LogWarning(
"Dynamic worker '{WorkerName}' removed from memory, but the persisted TickerQ cron entry may remain in storage.",
workerName);
if (_cronTickerIds.TryRemove(workerName, out var cronTickerId))
{
await CronTickerManager.DeleteAsync(cronTickerId, cancellationToken);
}
return Task.FromResult(true);
return true;
}
public virtual async Task<bool> UpdateScheduleAsync(
@ -137,11 +150,22 @@ public class TickerQDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
existingWorker.CronExpression = cronExpression;
}
await CronTickerManager.AddAsync(new CronTickerEntity
// Delete old entry and create new one with updated expression
if (_cronTickerIds.TryRemove(workerName, out var oldCronTickerId))
{
await CronTickerManager.DeleteAsync(oldCronTickerId, cancellationToken);
}
var result = await CronTickerManager.AddAsync(new CronTickerEntity
{
Function = functionName,
Expression = cronExpression
});
}, cancellationToken);
if (result.IsSucceeded && result.Result != null)
{
_cronTickerIds[workerName] = result.Result.Id;
}
return true;
}

Loading…
Cancel
Save