mirror of https://github.com/abpframework/abp.git
Browse Source
Add KeyedLock for per-key async locking and update local distributed …pull/24506/head
committed by
GitHub
6 changed files with 462 additions and 20 deletions
@ -0,0 +1,242 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
|
||||
|
namespace Volo.Abp.Threading; |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Per-key asynchronous lock for coordinating concurrent flows.
|
||||
|
/// </summary>
|
||||
|
/// <remarks>
|
||||
|
/// Based on the pattern described in https://stackoverflow.com/a/31194647.
|
||||
|
/// Use within a <c>using</c> scope to ensure the lock is released via <c>IDisposable.Dispose()</c>.
|
||||
|
/// </remarks>
|
||||
|
public static class KeyedLock |
||||
|
{ |
||||
|
private static readonly Dictionary<object, RefCounted<SemaphoreSlim>> SemaphoreSlims = new(); |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Acquires an exclusive asynchronous lock for the specified <paramref name="key"/>.
|
||||
|
/// This method waits until the lock becomes available.
|
||||
|
/// </summary>
|
||||
|
/// <param name="key">A non-null object that identifies the lock. Objects considered equal by dictionary semantics will share the same lock.</param>
|
||||
|
/// <returns>An <see cref="IDisposable"/> handle that must be disposed to release the lock.</returns>
|
||||
|
/// <exception cref="ArgumentNullException">Thrown when <paramref name="key"/> is <see langword="null"/>.</exception>
|
||||
|
/// <example>
|
||||
|
/// <code>
|
||||
|
/// var key = "my-critical-section";
|
||||
|
/// using (await KeyedLock.LockAsync(key))
|
||||
|
/// {
|
||||
|
/// // protected work
|
||||
|
/// }
|
||||
|
/// </code>
|
||||
|
/// </example>
|
||||
|
public static async Task<IDisposable> LockAsync(object key) |
||||
|
{ |
||||
|
Check.NotNull(key, nameof(key)); |
||||
|
return await LockAsync(key, CancellationToken.None); |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Acquires an exclusive asynchronous lock for the specified <paramref name="key"/>, observing a <paramref name="cancellationToken"/>.
|
||||
|
/// </summary>
|
||||
|
/// <param name="key">A non-null object that identifies the lock. Objects considered equal by dictionary semantics will share the same lock.</param>
|
||||
|
/// <param name="cancellationToken">A token to cancel the wait for the lock.</param>
|
||||
|
/// <returns>An <see cref="IDisposable"/> handle that must be disposed to release the lock.</returns>
|
||||
|
/// <exception cref="ArgumentNullException">Thrown when <paramref name="key"/> is <see langword="null"/>.</exception>
|
||||
|
/// <exception cref="OperationCanceledException">Thrown if the wait is canceled via <paramref name="cancellationToken"/>.</exception>
|
||||
|
/// <example>
|
||||
|
/// <code>
|
||||
|
/// var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
|
/// using (await KeyedLock.LockAsync("db-update", cts.Token))
|
||||
|
/// {
|
||||
|
/// // protected work
|
||||
|
/// }
|
||||
|
/// </code>
|
||||
|
/// </example>
|
||||
|
public static async Task<IDisposable> LockAsync(object key, CancellationToken cancellationToken) |
||||
|
{ |
||||
|
Check.NotNull(key, nameof(key)); |
||||
|
var semaphore = GetOrCreate(key); |
||||
|
try |
||||
|
{ |
||||
|
await semaphore.WaitAsync(cancellationToken); |
||||
|
} |
||||
|
catch (OperationCanceledException) |
||||
|
{ |
||||
|
var toDispose = DecrementRefAndMaybeRemove(key); |
||||
|
toDispose?.Dispose(); |
||||
|
throw; |
||||
|
} |
||||
|
return new Releaser(key); |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Attempts to acquire an exclusive lock for the specified <paramref name="key"/> without waiting.
|
||||
|
/// </summary>
|
||||
|
/// <param name="key">A non-null object that identifies the lock.</param>
|
||||
|
/// <returns>
|
||||
|
/// An <see cref="IDisposable"/> handle if the lock was immediately acquired; otherwise <see langword="null"/>.
|
||||
|
/// </returns>
|
||||
|
/// <exception cref="ArgumentNullException">Thrown when <paramref name="key"/> is <see langword="null"/>.</exception>
|
||||
|
/// <example>
|
||||
|
/// <code>
|
||||
|
/// var handle = await KeyedLock.TryLockAsync("cache-key");
|
||||
|
/// if (handle != null)
|
||||
|
/// {
|
||||
|
/// using (handle)
|
||||
|
/// {
|
||||
|
/// // protected work
|
||||
|
/// }
|
||||
|
/// }
|
||||
|
/// </code>
|
||||
|
/// </example>
|
||||
|
public static async Task<IDisposable?> TryLockAsync(object key) |
||||
|
{ |
||||
|
Check.NotNull(key, nameof(key)); |
||||
|
return await TryLockAsync(key, default, CancellationToken.None); |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Attempts to acquire an exclusive lock for the specified <paramref name="key"/>, waiting up to <paramref name="timeout"/>.
|
||||
|
/// </summary>
|
||||
|
/// <param name="key">A non-null object that identifies the lock.</param>
|
||||
|
/// <param name="timeout">Maximum time to wait for the lock. If set to <see cref="TimeSpan.Zero"/>, the method performs an immediate, non-blocking attempt.</param>
|
||||
|
/// <param name="cancellationToken">A token to cancel the wait.</param>
|
||||
|
/// <returns>
|
||||
|
/// An <see cref="IDisposable"/> handle if the lock was acquired within the timeout; otherwise <see langword="null"/>.
|
||||
|
/// </returns>
|
||||
|
/// <exception cref="ArgumentNullException">Thrown when <paramref name="key"/> is <see langword="null"/>.</exception>
|
||||
|
/// <exception cref="OperationCanceledException">Thrown if the wait is canceled via <paramref name="cancellationToken"/>.</exception>
|
||||
|
/// <example>
|
||||
|
/// <code>
|
||||
|
/// var handle = await KeyedLock.TryLockAsync("send-mail", TimeSpan.FromSeconds(1));
|
||||
|
/// if (handle != null)
|
||||
|
/// {
|
||||
|
/// using (handle)
|
||||
|
/// {
|
||||
|
/// // protected work
|
||||
|
/// }
|
||||
|
/// }
|
||||
|
/// else
|
||||
|
/// {
|
||||
|
/// // lock not acquired within timeout
|
||||
|
/// }
|
||||
|
/// </code>
|
||||
|
/// </example>
|
||||
|
public static async Task<IDisposable?> TryLockAsync(object key, TimeSpan timeout, CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
Check.NotNull(key, nameof(key)); |
||||
|
var semaphore = GetOrCreate(key); |
||||
|
bool acquired; |
||||
|
try |
||||
|
{ |
||||
|
if (timeout == default) |
||||
|
{ |
||||
|
acquired = await semaphore.WaitAsync(0, cancellationToken); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
acquired = await semaphore.WaitAsync(timeout, cancellationToken); |
||||
|
} |
||||
|
} |
||||
|
catch (OperationCanceledException) |
||||
|
{ |
||||
|
var toDispose = DecrementRefAndMaybeRemove(key); |
||||
|
toDispose?.Dispose(); |
||||
|
throw; |
||||
|
} |
||||
|
|
||||
|
if (acquired) |
||||
|
{ |
||||
|
return new Releaser(key); |
||||
|
} |
||||
|
|
||||
|
var toDisposeOnFail = DecrementRefAndMaybeRemove(key); |
||||
|
toDisposeOnFail?.Dispose(); |
||||
|
|
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
private static SemaphoreSlim GetOrCreate(object key) |
||||
|
{ |
||||
|
RefCounted<SemaphoreSlim> item; |
||||
|
lock (SemaphoreSlims) |
||||
|
{ |
||||
|
if (SemaphoreSlims.TryGetValue(key, out item!)) |
||||
|
{ |
||||
|
++item.RefCount; |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1)); |
||||
|
SemaphoreSlims[key] = item; |
||||
|
} |
||||
|
} |
||||
|
return item.Value; |
||||
|
} |
||||
|
|
||||
|
private sealed class RefCounted<T>(T value) |
||||
|
{ |
||||
|
public int RefCount { get; set; } = 1; |
||||
|
|
||||
|
public T Value { get; } = value; |
||||
|
} |
||||
|
|
||||
|
private sealed class Releaser(object key) : IDisposable |
||||
|
{ |
||||
|
private int _disposed; |
||||
|
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
if (Interlocked.Exchange(ref _disposed, 1) == 1) |
||||
|
{ |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
RefCounted<SemaphoreSlim> item; |
||||
|
var shouldDispose = false; |
||||
|
lock (SemaphoreSlims) |
||||
|
{ |
||||
|
if (!SemaphoreSlims.TryGetValue(key, out item!)) |
||||
|
{ |
||||
|
return; |
||||
|
} |
||||
|
--item.RefCount; |
||||
|
if (item.RefCount == 0) |
||||
|
{ |
||||
|
SemaphoreSlims.Remove(key); |
||||
|
shouldDispose = true; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if (shouldDispose) |
||||
|
{ |
||||
|
item.Value.Dispose(); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
item.Value.Release(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private static SemaphoreSlim? DecrementRefAndMaybeRemove(object key) |
||||
|
{ |
||||
|
RefCounted<SemaphoreSlim>? itemToDispose = null; |
||||
|
lock (SemaphoreSlims) |
||||
|
{ |
||||
|
if (SemaphoreSlims.TryGetValue(key, out var item)) |
||||
|
{ |
||||
|
--item.RefCount; |
||||
|
if (item.RefCount == 0) |
||||
|
{ |
||||
|
SemaphoreSlims.Remove(key); |
||||
|
itemToDispose = item; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
return itemToDispose?.Value; |
||||
|
} |
||||
|
} |
||||
@ -1,21 +1,20 @@ |
|||||
using System.Threading; |
using System; |
||||
using System.Threading.Tasks; |
using System.Threading.Tasks; |
||||
|
|
||||
namespace Volo.Abp.DistributedLocking |
namespace Volo.Abp.DistributedLocking; |
||||
|
|
||||
|
public class LocalAbpDistributedLockHandle : IAbpDistributedLockHandle |
||||
{ |
{ |
||||
public class LocalAbpDistributedLockHandle : IAbpDistributedLockHandle |
private readonly IDisposable _disposable; |
||||
{ |
|
||||
private readonly SemaphoreSlim _semaphore; |
|
||||
|
|
||||
public LocalAbpDistributedLockHandle(SemaphoreSlim semaphore) |
public LocalAbpDistributedLockHandle(IDisposable disposable) |
||||
{ |
{ |
||||
_semaphore = semaphore; |
_disposable = disposable; |
||||
} |
} |
||||
|
|
||||
public ValueTask DisposeAsync() |
public ValueTask DisposeAsync() |
||||
{ |
{ |
||||
_semaphore.Release(); |
_disposable.Dispose(); |
||||
return default; |
return default; |
||||
} |
|
||||
} |
} |
||||
} |
} |
||||
|
|||||
@ -0,0 +1,17 @@ |
|||||
|
using System; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
|
||||
|
namespace Volo.Abp.DistributedLocking; |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// This implementation of <see cref="IAbpDistributedLock"/> does not provide any distributed locking functionality.
|
||||
|
/// Useful in scenarios where distributed locking is not required or during testing.
|
||||
|
/// </summary>
|
||||
|
public class NullAbpDistributedLock : IAbpDistributedLock |
||||
|
{ |
||||
|
public Task<IAbpDistributedLockHandle?> TryAcquireAsync(string name, TimeSpan timeout = default, CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
return Task.FromResult<IAbpDistributedLockHandle?>(new LocalAbpDistributedLockHandle(NullDisposable.Instance)); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,179 @@ |
|||||
|
using System; |
||||
|
using System.Diagnostics; |
||||
|
using System.Threading; |
||||
|
using System.Linq; |
||||
|
using System.Threading.Tasks; |
||||
|
using Shouldly; |
||||
|
using Xunit; |
||||
|
|
||||
|
namespace Volo.Abp.Threading; |
||||
|
|
||||
|
public class KeyedLock_Tests |
||||
|
{ |
||||
|
[Fact] |
||||
|
public async Task TryLock_Should_Acquire_Immediately_When_Free() |
||||
|
{ |
||||
|
var key = "key-try-1"; |
||||
|
var handle = await KeyedLock.TryLockAsync(key); |
||||
|
handle.ShouldNotBeNull(); |
||||
|
handle!.Dispose(); |
||||
|
|
||||
|
var handle2 = await KeyedLock.TryLockAsync(key); |
||||
|
handle2.ShouldNotBeNull(); |
||||
|
handle2!.Dispose(); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task TryLock_Should_Return_Null_When_Already_Locked() |
||||
|
{ |
||||
|
var key = "key-try-2"; |
||||
|
using (await KeyedLock.LockAsync(key)) |
||||
|
{ |
||||
|
var handle2 = await KeyedLock.TryLockAsync(key); |
||||
|
handle2.ShouldBeNull(); |
||||
|
} |
||||
|
|
||||
|
var handle3 = await KeyedLock.TryLockAsync(key); |
||||
|
handle3.ShouldNotBeNull(); |
||||
|
handle3!.Dispose(); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task LockAsync_Should_Block_Until_Released() |
||||
|
{ |
||||
|
var key = "key-block-1"; |
||||
|
var sw = Stopwatch.StartNew(); |
||||
|
|
||||
|
Task inner; |
||||
|
using (await KeyedLock.LockAsync(key)) |
||||
|
{ |
||||
|
inner = Task.Run(async () => |
||||
|
{ |
||||
|
using (await KeyedLock.LockAsync(key)) |
||||
|
{ |
||||
|
// Acquired only after outer lock is released
|
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
// While holding the outer lock, inner waiter should not complete
|
||||
|
await Task.Delay(200); |
||||
|
inner.IsCompleted.ShouldBeFalse(); |
||||
|
} |
||||
|
|
||||
|
// After releasing, inner should complete; elapsed >= hold time
|
||||
|
await inner; |
||||
|
sw.ElapsedMilliseconds.ShouldBeGreaterThanOrEqualTo(180); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task TryLock_With_Timeout_Should_Return_Null_When_Busy() |
||||
|
{ |
||||
|
var key = "key-timeout-1"; |
||||
|
using (await KeyedLock.LockAsync(key)) |
||||
|
{ |
||||
|
var handle = await KeyedLock.TryLockAsync(key, TimeSpan.FromMilliseconds(50)); |
||||
|
handle.ShouldBeNull(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task TryLock_With_Timeout_Should_Succeed_If_Released_In_Time() |
||||
|
{ |
||||
|
var key = "key-timeout-2"; |
||||
|
// Hold the lock manually
|
||||
|
var outer = await KeyedLock.LockAsync(key); |
||||
|
var tryTask = KeyedLock.TryLockAsync(key, TimeSpan.FromMilliseconds(200)); |
||||
|
await Task.Delay(50); |
||||
|
// Release within the timeout window
|
||||
|
outer.Dispose(); |
||||
|
var handle2 = await tryTask; |
||||
|
handle2.ShouldNotBeNull(); |
||||
|
handle2!.Dispose(); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task LockAsync_With_Cancellation_Should_Rollback_RefCount() |
||||
|
{ |
||||
|
var key = "key-cancel-1"; |
||||
|
var cts = new CancellationTokenSource(); |
||||
|
await cts.CancelAsync(); |
||||
|
await Should.ThrowAsync<OperationCanceledException>(async () => |
||||
|
{ |
||||
|
await KeyedLock.LockAsync(key, cts.Token); |
||||
|
}); |
||||
|
|
||||
|
// After cancellation, we should still be able to acquire the key
|
||||
|
var handle = await KeyedLock.TryLockAsync(key); |
||||
|
handle.ShouldNotBeNull(); |
||||
|
handle!.Dispose(); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task TryLock_With_Cancellation_Should_Rollback() |
||||
|
{ |
||||
|
var key = "key-cancel-2"; |
||||
|
// Ensure it's initially free
|
||||
|
var h0 = await KeyedLock.TryLockAsync(key); |
||||
|
h0?.Dispose(); |
||||
|
|
||||
|
var cts = new CancellationTokenSource(); |
||||
|
await cts.CancelAsync(); |
||||
|
await Should.ThrowAsync<OperationCanceledException>(async () => |
||||
|
{ |
||||
|
await KeyedLock.TryLockAsync(key, TimeSpan.FromMilliseconds(200), cts.Token); |
||||
|
}); |
||||
|
|
||||
|
// After cancellation, the key should be acquirable
|
||||
|
var handle = await KeyedLock.TryLockAsync(key); |
||||
|
handle.ShouldNotBeNull(); |
||||
|
handle!.Dispose(); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task Serializes_Access_For_Same_Key() |
||||
|
{ |
||||
|
var key = "key-serial-1"; |
||||
|
int counter = 0; |
||||
|
var tasks = Enumerable.Range(0, 10).Select(async _ => |
||||
|
{ |
||||
|
using (await KeyedLock.LockAsync(key)) |
||||
|
{ |
||||
|
var current = counter; |
||||
|
await Task.Delay(10); |
||||
|
counter = current + 1; |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
await Task.WhenAll(tasks); |
||||
|
counter.ShouldBe(10); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task Multiple_Keys_Should_Not_Block_Each_Other() |
||||
|
{ |
||||
|
var key1 = "key-multi-1"; |
||||
|
var key2 = "key-multi-2"; |
||||
|
|
||||
|
using (await KeyedLock.LockAsync(key1)) |
||||
|
{ |
||||
|
var handle2 = await KeyedLock.TryLockAsync(key2); |
||||
|
handle2.ShouldNotBeNull(); |
||||
|
handle2!.Dispose(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task TryLock_Default_Overload_Delegates_To_Full_Overload() |
||||
|
{ |
||||
|
var key = "key-default-1"; |
||||
|
using (await KeyedLock.LockAsync(key)) |
||||
|
{ |
||||
|
var h1 = await KeyedLock.TryLockAsync(key); |
||||
|
h1.ShouldBeNull(); |
||||
|
} |
||||
|
|
||||
|
var h2 = await KeyedLock.TryLockAsync(key); |
||||
|
h2.ShouldNotBeNull(); |
||||
|
h2!.Dispose(); |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue