From d8441d7fc400f98f8648bde0942bfb888a7bab1a Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 30 Dec 2025 10:10:56 +0800 Subject: [PATCH] Add KeyedLock for per-key async locking and update local distributed lock --- .../Volo/Abp/Threading/KeyedLock.cs | 152 +++++++++++++++ .../LocalAbpDistributedLock.cs | 13 +- .../LocalAbpDistributedLockHandle.cs | 26 ++- .../Volo/Abp/Threading/KeyedLock_Tests.cs | 179 ++++++++++++++++++ 4 files changed, 350 insertions(+), 20 deletions(-) create mode 100644 framework/src/Volo.Abp.Core/Volo/Abp/Threading/KeyedLock.cs create mode 100644 framework/test/Volo.Abp.Core.Tests/Volo/Abp/Threading/KeyedLock_Tests.cs diff --git a/framework/src/Volo.Abp.Core/Volo/Abp/Threading/KeyedLock.cs b/framework/src/Volo.Abp.Core/Volo/Abp/Threading/KeyedLock.cs new file mode 100644 index 0000000000..80e750ac33 --- /dev/null +++ b/framework/src/Volo.Abp.Core/Volo/Abp/Threading/KeyedLock.cs @@ -0,0 +1,152 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Volo.Abp.Threading; + +/// +/// Per-key asynchronous lock. +/// https://stackoverflow.com/a/31194647 +/// +public static class KeyedLock +{ + private static readonly Dictionary> SemaphoreSlims = new(); + + public static async Task LockAsync(object key) + { + return await LockAsync(key, CancellationToken.None).ConfigureAwait(false); + } + + public static async Task LockAsync(object key, CancellationToken cancellationToken) + { + var semaphore = GetOrCreate(key); + try + { + await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + var toDispose = DecrementRefAndMaybeRemove(key); + toDispose?.Dispose(); + throw; + } + return new Releaser(key); + } + + public static async Task TryLockAsync(object key) + { + return await TryLockAsync(key, default, CancellationToken.None).ConfigureAwait(false); + } + + public static async Task TryLockAsync(object key, TimeSpan timeout, CancellationToken cancellationToken = default) + { + var semaphore = GetOrCreate(key); + bool acquired; + try + { + if (timeout == default) + { + acquired = await semaphore.WaitAsync(0, cancellationToken).ConfigureAwait(false); + } + else + { + acquired = await semaphore.WaitAsync(timeout, cancellationToken).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + var toDispose = DecrementRefAndMaybeRemove(key); + toDispose?.Dispose(); + throw; + } + + if (acquired) + { + return new Releaser(key); + } + + var toDisposeOnFail = DecrementRefAndMaybeRemove(key); + toDisposeOnFail?.Dispose(); + + return null; + } + + private static SemaphoreSlim GetOrCreate(object key) + { + RefCounted item; + lock (SemaphoreSlims) + { + if (SemaphoreSlims.TryGetValue(key, out item!)) + { + ++item.RefCount; + } + else + { + item = new RefCounted(new SemaphoreSlim(1, 1)); + SemaphoreSlims[key] = item; + } + } + return item.Value; + } + + private sealed class RefCounted(T value) + { + public int RefCount { get; set; } = 1; + + public T Value { get; } = value; + } + + private sealed class Releaser(object key) : IDisposable + { + public void Dispose() + { + RefCounted item; + lock (SemaphoreSlims) + { + if (!SemaphoreSlims.TryGetValue(key, out item!)) + { + return; + } + --item.RefCount; + } + item.Value.Release(); + + bool shouldDispose = false; + lock (SemaphoreSlims) + { + if (SemaphoreSlims.TryGetValue(key, out var current) && ReferenceEquals(current, item)) + { + if (item.RefCount == 0) + { + SemaphoreSlims.Remove(key); + shouldDispose = true; + } + } + } + + if (shouldDispose) + { + item.Value.Dispose(); + } + } + } + + private static SemaphoreSlim? DecrementRefAndMaybeRemove(object key) + { + RefCounted? itemToDispose = null; + lock (SemaphoreSlims) + { + if (SemaphoreSlims.TryGetValue(key, out var item)) + { + --item.RefCount; + if (item.RefCount == 0) + { + SemaphoreSlims.Remove(key); + itemToDispose = item; + } + } + } + return itemToDispose?.Value; + } +} diff --git a/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLock.cs b/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLock.cs index d8d8eb9d09..5d12d0deeb 100644 --- a/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLock.cs +++ b/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLock.cs @@ -1,14 +1,13 @@ using System; -using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; +using Volo.Abp.Threading; namespace Volo.Abp.DistributedLocking; public class LocalAbpDistributedLock : IAbpDistributedLock, ISingletonDependency { - private readonly ConcurrentDictionary _localSyncObjects = new(); protected IDistributedLockKeyNormalizer DistributedLockKeyNormalizer { get; } public LocalAbpDistributedLock(IDistributedLockKeyNormalizer distributedLockKeyNormalizer) @@ -23,9 +22,11 @@ public class LocalAbpDistributedLock : IAbpDistributedLock, ISingletonDependency { Check.NotNullOrWhiteSpace(name, nameof(name)); var key = DistributedLockKeyNormalizer.NormalizeKey(name); - - var semaphore = _localSyncObjects.GetOrAdd(key, _ => new SemaphoreSlim(1, 1)); - var acquired = await semaphore.WaitAsync(timeout, cancellationToken); - return acquired ? new LocalAbpDistributedLockHandle(semaphore) : null; + var disposable = await KeyedLock.TryLockAsync(key, timeout, cancellationToken); + if (disposable == null) + { + return null; + } + return new LocalAbpDistributedLockHandle(disposable); } } diff --git a/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLockHandle.cs b/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLockHandle.cs index 5ffe95af5e..70427db7b2 100644 --- a/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLockHandle.cs +++ b/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLockHandle.cs @@ -1,21 +1,19 @@ -using System.Threading; +using System; using System.Threading.Tasks; +using Volo.Abp.DistributedLocking; -namespace Volo.Abp.DistributedLocking +public class LocalAbpDistributedLockHandle : IAbpDistributedLockHandle { - public class LocalAbpDistributedLockHandle : IAbpDistributedLockHandle - { - private readonly SemaphoreSlim _semaphore; + private readonly IDisposable _disposable; - public LocalAbpDistributedLockHandle(SemaphoreSlim semaphore) - { - _semaphore = semaphore; - } + public LocalAbpDistributedLockHandle(IDisposable disposable) + { + _disposable = disposable; + } - public ValueTask DisposeAsync() - { - _semaphore.Release(); - return default; - } + public ValueTask DisposeAsync() + { + _disposable.Dispose(); + return default; } } diff --git a/framework/test/Volo.Abp.Core.Tests/Volo/Abp/Threading/KeyedLock_Tests.cs b/framework/test/Volo.Abp.Core.Tests/Volo/Abp/Threading/KeyedLock_Tests.cs new file mode 100644 index 0000000000..633ef3dd76 --- /dev/null +++ b/framework/test/Volo.Abp.Core.Tests/Volo/Abp/Threading/KeyedLock_Tests.cs @@ -0,0 +1,179 @@ +using System; +using System.Diagnostics; +using System.Threading; +using System.Linq; +using System.Threading.Tasks; +using Shouldly; +using Xunit; + +namespace Volo.Abp.Threading; + +public class KeyedLock_Tests +{ + [Fact] + public async Task TryLock_Should_Acquire_Immediately_When_Free() + { + var key = "key-try-1"; + var handle = await KeyedLock.TryLockAsync(key); + handle.ShouldNotBeNull(); + handle!.Dispose(); + + var handle2 = await KeyedLock.TryLockAsync(key); + handle2.ShouldNotBeNull(); + handle2!.Dispose(); + } + + [Fact] + public async Task TryLock_Should_Return_Null_When_Already_Locked() + { + var key = "key-try-2"; + using (await KeyedLock.LockAsync(key)) + { + var handle2 = await KeyedLock.TryLockAsync(key); + handle2.ShouldBeNull(); + } + + var handle3 = await KeyedLock.TryLockAsync(key); + handle3.ShouldNotBeNull(); + handle3!.Dispose(); + } + + [Fact] + public async Task LockAsync_Should_Block_Until_Released() + { + var key = "key-block-1"; + var sw = Stopwatch.StartNew(); + + Task inner; + using (await KeyedLock.LockAsync(key)) + { + inner = Task.Run(async () => + { + using (await KeyedLock.LockAsync(key)) + { + // Acquired only after outer lock is released + } + }); + + // While holding the outer lock, inner waiter should not complete + await Task.Delay(150); + inner.IsCompleted.ShouldBeFalse(); + } + + // After releasing, inner should complete; elapsed >= hold time + await inner; + sw.ElapsedMilliseconds.ShouldBeGreaterThanOrEqualTo(150); + } + + [Fact] + public async Task TryLock_With_Timeout_Should_Return_Null_When_Busy() + { + var key = "key-timeout-1"; + using (await KeyedLock.LockAsync(key)) + { + var handle = await KeyedLock.TryLockAsync(key, TimeSpan.FromMilliseconds(50)); + handle.ShouldBeNull(); + } + } + + [Fact] + public async Task TryLock_With_Timeout_Should_Succeed_If_Released_In_Time() + { + var key = "key-timeout-2"; + // Hold the lock manually + var outer = await KeyedLock.LockAsync(key); + var tryTask = KeyedLock.TryLockAsync(key, TimeSpan.FromMilliseconds(200)); + await Task.Delay(50); + // Release within the timeout window + outer.Dispose(); + var handle2 = await tryTask; + handle2.ShouldNotBeNull(); + handle2!.Dispose(); + } + + [Fact] + public async Task LockAsync_With_Cancellation_Should_Rollback_RefCount() + { + var key = "key-cancel-1"; + var cts = new CancellationTokenSource(); + await cts.CancelAsync(); + await Should.ThrowAsync(async () => + { + await KeyedLock.LockAsync(key, cts.Token); + }); + + // After cancellation, we should still be able to acquire the key + var handle = await KeyedLock.TryLockAsync(key); + handle.ShouldNotBeNull(); + handle!.Dispose(); + } + + [Fact] + public async Task TryLock_With_Cancellation_Should_Rollback() + { + var key = "key-cancel-2"; + // Ensure it's initially free + var h0 = await KeyedLock.TryLockAsync(key); + h0?.Dispose(); + + var cts = new CancellationTokenSource(); + await cts.CancelAsync(); + await Should.ThrowAsync(async () => + { + await KeyedLock.TryLockAsync(key, TimeSpan.FromMilliseconds(200), cts.Token); + }); + + // After cancellation, the key should be acquirable + var handle = await KeyedLock.TryLockAsync(key); + handle.ShouldNotBeNull(); + handle!.Dispose(); + } + + [Fact] + public async Task Serializes_Access_For_Same_Key() + { + var key = "key-serial-1"; + int counter = 0; + var tasks = Enumerable.Range(0, 10).Select(async _ => + { + using (await KeyedLock.LockAsync(key)) + { + var current = counter; + await Task.Delay(10); + counter = current + 1; + } + }); + + await Task.WhenAll(tasks); + counter.ShouldBe(10); + } + + [Fact] + public async Task Multiple_Keys_Should_Not_Block_Each_Other() + { + var key1 = "key-multi-1"; + var key2 = "key-multi-2"; + + using (await KeyedLock.LockAsync(key1)) + { + var handle2 = await KeyedLock.TryLockAsync(key2); + handle2.ShouldNotBeNull(); + handle2!.Dispose(); + } + } + + [Fact] + public async Task TryLock_Default_Overload_Delegates_To_Full_Overload() + { + var key = "key-default-1"; + using (await KeyedLock.LockAsync(key)) + { + var h1 = await KeyedLock.TryLockAsync(key); + h1.ShouldBeNull(); + } + + var h2 = await KeyedLock.TryLockAsync(key); + h2.ShouldNotBeNull(); + h2!.Dispose(); + } +}