diff --git a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DefaultDynamicBackgroundWorkerManager.cs b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DefaultDynamicBackgroundWorkerManager.cs index 030b6299d0..598bd74014 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DefaultDynamicBackgroundWorkerManager.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DefaultDynamicBackgroundWorkerManager.cs @@ -16,6 +16,7 @@ public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan public ILogger Logger { get; set; } private readonly ConcurrentDictionary _dynamicWorkers; + private readonly SemaphoreSlim _semaphore; private bool _isDisposed; public DefaultDynamicBackgroundWorkerManager(IServiceProvider serviceProvider) @@ -23,6 +24,7 @@ public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan ServiceProvider = serviceProvider; Logger = NullLogger.Instance; _dynamicWorkers = new ConcurrentDictionary(); + _semaphore = new SemaphoreSlim(1, 1); } public virtual async Task AddAsync( @@ -44,32 +46,48 @@ public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan "Please set Period, or use a scheduler-backed provider (Hangfire, Quartz, TickerQ)."); } - if (_dynamicWorkers.TryRemove(workerName, out var existingWorker)) + await _semaphore.WaitAsync(cancellationToken); + try { - await existingWorker.StopAsync(cancellationToken); - Logger.LogInformation("Replaced existing dynamic worker: {WorkerName}", workerName); - } + if (_dynamicWorkers.TryRemove(workerName, out var existingWorker)) + { + await existingWorker.StopAsync(cancellationToken); + Logger.LogInformation("Replaced existing dynamic worker: {WorkerName}", workerName); + } - var worker = CreateDynamicWorker(workerName, schedule, handler); - _dynamicWorkers[workerName] = worker; + var worker = CreateDynamicWorker(workerName, schedule, handler); + _dynamicWorkers[workerName] = worker; - await worker.StartAsync(cancellationToken); + await worker.StartAsync(cancellationToken); + } + finally + { + _semaphore.Release(); + } } public virtual async Task RemoveAsync(string workerName, CancellationToken cancellationToken = default) { Check.NotNullOrWhiteSpace(workerName, nameof(workerName)); - if (!_dynamicWorkers.TryRemove(workerName, out var worker)) + await _semaphore.WaitAsync(cancellationToken); + try { - return false; - } + if (!_dynamicWorkers.TryRemove(workerName, out var worker)) + { + return false; + } - await worker.StopAsync(cancellationToken); - return true; + await worker.StopAsync(cancellationToken); + return true; + } + finally + { + _semaphore.Release(); + } } - public virtual Task UpdateScheduleAsync( + public virtual async Task UpdateScheduleAsync( string workerName, DynamicBackgroundWorkerSchedule schedule, CancellationToken cancellationToken = default) @@ -86,13 +104,21 @@ public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan "Please set Period, or use a scheduler-backed provider (Hangfire, Quartz, TickerQ)."); } - if (!_dynamicWorkers.TryGetValue(workerName, out var worker)) + await _semaphore.WaitAsync(cancellationToken); + try { - return Task.FromResult(false); - } + if (!_dynamicWorkers.TryGetValue(workerName, out var worker)) + { + return false; + } - worker.UpdateSchedule(schedule); - return Task.FromResult(true); + worker.UpdateSchedule(schedule); + return true; + } + finally + { + _semaphore.Release(); + } } public virtual bool IsRegistered(string workerName) @@ -123,6 +149,7 @@ public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan } _dynamicWorkers.Clear(); + _semaphore.Dispose(); } protected virtual InMemoryDynamicBackgroundWorker CreateDynamicWorker( diff --git a/framework/test/Volo.Abp.BackgroundJobs.Tests/Volo/Abp/BackgroundJobs/DynamicBackgroundWorkerManager_Tests.cs b/framework/test/Volo.Abp.BackgroundJobs.Tests/Volo/Abp/BackgroundJobs/DynamicBackgroundWorkerManager_Tests.cs index 2c3f8414e6..be99167d85 100644 --- a/framework/test/Volo.Abp.BackgroundJobs.Tests/Volo/Abp/BackgroundJobs/DynamicBackgroundWorkerManager_Tests.cs +++ b/framework/test/Volo.Abp.BackgroundJobs.Tests/Volo/Abp/BackgroundJobs/DynamicBackgroundWorkerManager_Tests.cs @@ -1,4 +1,7 @@ using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Shouldly; @@ -277,4 +280,107 @@ public class DynamicBackgroundWorkerManager_Tests : BackgroundJobsTestBase _dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse(); } + + [Fact] + public async Task Should_Handle_Concurrent_Add_With_Same_Name() + { + var workerName = "concurrent-worker-" + Guid.NewGuid(); + var executedHandlerIds = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, 10).Select(i => + _dynamicWorkerManager.AddAsync( + workerName, + new DynamicBackgroundWorkerSchedule { Period = 60000 }, + (_, _) => + { + executedHandlerIds.Add(i); + return Task.CompletedTask; + } + ) + ).ToList(); + + await Task.WhenAll(tasks); + + _dynamicWorkerManager.IsRegistered(workerName).ShouldBeTrue(); + + var removed = await _dynamicWorkerManager.RemoveAsync(workerName); + removed.ShouldBeTrue(); + _dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse(); + } + + [Fact] + public async Task Should_Handle_Concurrent_Add_And_Remove() + { + var workerNames = Enumerable.Range(0, 10) + .Select(i => $"concurrent-worker-{i}-" + Guid.NewGuid()) + .ToList(); + + var addTasks = workerNames.Select(name => + _dynamicWorkerManager.AddAsync( + name, + new DynamicBackgroundWorkerSchedule { Period = 60000 }, + (_, _) => Task.CompletedTask + ) + ).ToList(); + + await Task.WhenAll(addTasks); + + foreach (var name in workerNames) + { + _dynamicWorkerManager.IsRegistered(name).ShouldBeTrue(); + } + + var removeTasks = workerNames.Select(name => + _dynamicWorkerManager.RemoveAsync(name) + ).ToList(); + + var results = await Task.WhenAll(removeTasks); + + results.ShouldAllBe(r => r); + + foreach (var name in workerNames) + { + _dynamicWorkerManager.IsRegistered(name).ShouldBeFalse(); + } + } + + [Fact] + public async Task Should_Handle_Concurrent_Add_Remove_Update() + { + var workerName = "concurrent-mixed-" + Guid.NewGuid(); + + await _dynamicWorkerManager.AddAsync( + workerName, + new DynamicBackgroundWorkerSchedule { Period = 60000 }, + (_, _) => Task.CompletedTask + ); + + var tasks = new List + { + _dynamicWorkerManager.AddAsync( + workerName, + new DynamicBackgroundWorkerSchedule { Period = 30000 }, + (_, _) => Task.CompletedTask + ), + _dynamicWorkerManager.UpdateScheduleAsync( + workerName, + new DynamicBackgroundWorkerSchedule { Period = 20000 } + ), + _dynamicWorkerManager.AddAsync( + workerName, + new DynamicBackgroundWorkerSchedule { Period = 10000 }, + (_, _) => Task.CompletedTask + ) + }; + + await Task.WhenAll(tasks); + + // After all concurrent operations, worker should still be in a consistent state + var isRegistered = _dynamicWorkerManager.IsRegistered(workerName); + isRegistered.ShouldBeTrue(); + + var removed = await _dynamicWorkerManager.RemoveAsync(workerName); + removed.ShouldBeTrue(); + _dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse(); + } }