Browse Source

Add SemaphoreSlim for thread safety and concurrency tests

Add SemaphoreSlim to DefaultDynamicBackgroundWorkerManager to protect
AddAsync/RemoveAsync/UpdateScheduleAsync from concurrent access on the
same worker name. Add 3 concurrency test cases covering concurrent
same-name add, concurrent add+remove, and concurrent mixed operations.
pull/25066/head
maliming 1 week ago
parent
commit
fa15bc4cae
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 63
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DefaultDynamicBackgroundWorkerManager.cs
  2. 106
      framework/test/Volo.Abp.BackgroundJobs.Tests/Volo/Abp/BackgroundJobs/DynamicBackgroundWorkerManager_Tests.cs

63
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/DefaultDynamicBackgroundWorkerManager.cs

@ -16,6 +16,7 @@ public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
public ILogger<DefaultDynamicBackgroundWorkerManager> Logger { get; set; }
private readonly ConcurrentDictionary<string, InMemoryDynamicBackgroundWorker> _dynamicWorkers;
private readonly SemaphoreSlim _semaphore;
private bool _isDisposed;
public DefaultDynamicBackgroundWorkerManager(IServiceProvider serviceProvider)
@ -23,6 +24,7 @@ public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
ServiceProvider = serviceProvider;
Logger = NullLogger<DefaultDynamicBackgroundWorkerManager>.Instance;
_dynamicWorkers = new ConcurrentDictionary<string, InMemoryDynamicBackgroundWorker>();
_semaphore = new SemaphoreSlim(1, 1);
}
public virtual async Task AddAsync(
@ -44,32 +46,48 @@ public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
"Please set Period, or use a scheduler-backed provider (Hangfire, Quartz, TickerQ).");
}
if (_dynamicWorkers.TryRemove(workerName, out var existingWorker))
await _semaphore.WaitAsync(cancellationToken);
try
{
await existingWorker.StopAsync(cancellationToken);
Logger.LogInformation("Replaced existing dynamic worker: {WorkerName}", workerName);
}
if (_dynamicWorkers.TryRemove(workerName, out var existingWorker))
{
await existingWorker.StopAsync(cancellationToken);
Logger.LogInformation("Replaced existing dynamic worker: {WorkerName}", workerName);
}
var worker = CreateDynamicWorker(workerName, schedule, handler);
_dynamicWorkers[workerName] = worker;
var worker = CreateDynamicWorker(workerName, schedule, handler);
_dynamicWorkers[workerName] = worker;
await worker.StartAsync(cancellationToken);
await worker.StartAsync(cancellationToken);
}
finally
{
_semaphore.Release();
}
}
public virtual async Task<bool> RemoveAsync(string workerName, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(workerName, nameof(workerName));
if (!_dynamicWorkers.TryRemove(workerName, out var worker))
await _semaphore.WaitAsync(cancellationToken);
try
{
return false;
}
if (!_dynamicWorkers.TryRemove(workerName, out var worker))
{
return false;
}
await worker.StopAsync(cancellationToken);
return true;
await worker.StopAsync(cancellationToken);
return true;
}
finally
{
_semaphore.Release();
}
}
public virtual Task<bool> UpdateScheduleAsync(
public virtual async Task<bool> UpdateScheduleAsync(
string workerName,
DynamicBackgroundWorkerSchedule schedule,
CancellationToken cancellationToken = default)
@ -86,13 +104,21 @@ public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
"Please set Period, or use a scheduler-backed provider (Hangfire, Quartz, TickerQ).");
}
if (!_dynamicWorkers.TryGetValue(workerName, out var worker))
await _semaphore.WaitAsync(cancellationToken);
try
{
return Task.FromResult(false);
}
if (!_dynamicWorkers.TryGetValue(workerName, out var worker))
{
return false;
}
worker.UpdateSchedule(schedule);
return Task.FromResult(true);
worker.UpdateSchedule(schedule);
return true;
}
finally
{
_semaphore.Release();
}
}
public virtual bool IsRegistered(string workerName)
@ -123,6 +149,7 @@ public class DefaultDynamicBackgroundWorkerManager : IDynamicBackgroundWorkerMan
}
_dynamicWorkers.Clear();
_semaphore.Dispose();
}
protected virtual InMemoryDynamicBackgroundWorker CreateDynamicWorker(

106
framework/test/Volo.Abp.BackgroundJobs.Tests/Volo/Abp/BackgroundJobs/DynamicBackgroundWorkerManager_Tests.cs

@ -1,4 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
@ -277,4 +280,107 @@ public class DynamicBackgroundWorkerManager_Tests : BackgroundJobsTestBase
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse();
}
[Fact]
public async Task Should_Handle_Concurrent_Add_With_Same_Name()
{
var workerName = "concurrent-worker-" + Guid.NewGuid();
var executedHandlerIds = new ConcurrentBag<int>();
var tasks = Enumerable.Range(0, 10).Select(i =>
_dynamicWorkerManager.AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule { Period = 60000 },
(_, _) =>
{
executedHandlerIds.Add(i);
return Task.CompletedTask;
}
)
).ToList();
await Task.WhenAll(tasks);
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeTrue();
var removed = await _dynamicWorkerManager.RemoveAsync(workerName);
removed.ShouldBeTrue();
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse();
}
[Fact]
public async Task Should_Handle_Concurrent_Add_And_Remove()
{
var workerNames = Enumerable.Range(0, 10)
.Select(i => $"concurrent-worker-{i}-" + Guid.NewGuid())
.ToList();
var addTasks = workerNames.Select(name =>
_dynamicWorkerManager.AddAsync(
name,
new DynamicBackgroundWorkerSchedule { Period = 60000 },
(_, _) => Task.CompletedTask
)
).ToList();
await Task.WhenAll(addTasks);
foreach (var name in workerNames)
{
_dynamicWorkerManager.IsRegistered(name).ShouldBeTrue();
}
var removeTasks = workerNames.Select(name =>
_dynamicWorkerManager.RemoveAsync(name)
).ToList();
var results = await Task.WhenAll(removeTasks);
results.ShouldAllBe(r => r);
foreach (var name in workerNames)
{
_dynamicWorkerManager.IsRegistered(name).ShouldBeFalse();
}
}
[Fact]
public async Task Should_Handle_Concurrent_Add_Remove_Update()
{
var workerName = "concurrent-mixed-" + Guid.NewGuid();
await _dynamicWorkerManager.AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule { Period = 60000 },
(_, _) => Task.CompletedTask
);
var tasks = new List<Task>
{
_dynamicWorkerManager.AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule { Period = 30000 },
(_, _) => Task.CompletedTask
),
_dynamicWorkerManager.UpdateScheduleAsync(
workerName,
new DynamicBackgroundWorkerSchedule { Period = 20000 }
),
_dynamicWorkerManager.AddAsync(
workerName,
new DynamicBackgroundWorkerSchedule { Period = 10000 },
(_, _) => Task.CompletedTask
)
};
await Task.WhenAll(tasks);
// After all concurrent operations, worker should still be in a consistent state
var isRegistered = _dynamicWorkerManager.IsRegistered(workerName);
isRegistered.ShouldBeTrue();
var removed = await _dynamicWorkerManager.RemoveAsync(workerName);
removed.ShouldBeTrue();
_dynamicWorkerManager.IsRegistered(workerName).ShouldBeFalse();
}
}

Loading…
Cancel
Save