mirror of https://github.com/abpframework/abp.git
committed by
GitHub
6 changed files with 462 additions and 20 deletions
@ -0,0 +1,242 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.Threading; |
|||
|
|||
/// <summary>
|
|||
/// Per-key asynchronous lock for coordinating concurrent flows.
|
|||
/// </summary>
|
|||
/// <remarks>
|
|||
/// Based on the pattern described in https://stackoverflow.com/a/31194647.
|
|||
/// Use within a <c>using</c> scope to ensure the lock is released via <c>IDisposable.Dispose()</c>.
|
|||
/// </remarks>
|
|||
public static class KeyedLock |
|||
{ |
|||
private static readonly Dictionary<object, RefCounted<SemaphoreSlim>> SemaphoreSlims = new(); |
|||
|
|||
/// <summary>
|
|||
/// Acquires an exclusive asynchronous lock for the specified <paramref name="key"/>.
|
|||
/// This method waits until the lock becomes available.
|
|||
/// </summary>
|
|||
/// <param name="key">A non-null object that identifies the lock. Objects considered equal by dictionary semantics will share the same lock.</param>
|
|||
/// <returns>An <see cref="IDisposable"/> handle that must be disposed to release the lock.</returns>
|
|||
/// <exception cref="ArgumentNullException">Thrown when <paramref name="key"/> is <see langword="null"/>.</exception>
|
|||
/// <example>
|
|||
/// <code>
|
|||
/// var key = "my-critical-section";
|
|||
/// using (await KeyedLock.LockAsync(key))
|
|||
/// {
|
|||
/// // protected work
|
|||
/// }
|
|||
/// </code>
|
|||
/// </example>
|
|||
public static async Task<IDisposable> LockAsync(object key) |
|||
{ |
|||
Check.NotNull(key, nameof(key)); |
|||
return await LockAsync(key, CancellationToken.None); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Acquires an exclusive asynchronous lock for the specified <paramref name="key"/>, observing a <paramref name="cancellationToken"/>.
|
|||
/// </summary>
|
|||
/// <param name="key">A non-null object that identifies the lock. Objects considered equal by dictionary semantics will share the same lock.</param>
|
|||
/// <param name="cancellationToken">A token to cancel the wait for the lock.</param>
|
|||
/// <returns>An <see cref="IDisposable"/> handle that must be disposed to release the lock.</returns>
|
|||
/// <exception cref="ArgumentNullException">Thrown when <paramref name="key"/> is <see langword="null"/>.</exception>
|
|||
/// <exception cref="OperationCanceledException">Thrown if the wait is canceled via <paramref name="cancellationToken"/>.</exception>
|
|||
/// <example>
|
|||
/// <code>
|
|||
/// var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|||
/// using (await KeyedLock.LockAsync("db-update", cts.Token))
|
|||
/// {
|
|||
/// // protected work
|
|||
/// }
|
|||
/// </code>
|
|||
/// </example>
|
|||
public static async Task<IDisposable> LockAsync(object key, CancellationToken cancellationToken) |
|||
{ |
|||
Check.NotNull(key, nameof(key)); |
|||
var semaphore = GetOrCreate(key); |
|||
try |
|||
{ |
|||
await semaphore.WaitAsync(cancellationToken); |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
var toDispose = DecrementRefAndMaybeRemove(key); |
|||
toDispose?.Dispose(); |
|||
throw; |
|||
} |
|||
return new Releaser(key); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Attempts to acquire an exclusive lock for the specified <paramref name="key"/> without waiting.
|
|||
/// </summary>
|
|||
/// <param name="key">A non-null object that identifies the lock.</param>
|
|||
/// <returns>
|
|||
/// An <see cref="IDisposable"/> handle if the lock was immediately acquired; otherwise <see langword="null"/>.
|
|||
/// </returns>
|
|||
/// <exception cref="ArgumentNullException">Thrown when <paramref name="key"/> is <see langword="null"/>.</exception>
|
|||
/// <example>
|
|||
/// <code>
|
|||
/// var handle = await KeyedLock.TryLockAsync("cache-key");
|
|||
/// if (handle != null)
|
|||
/// {
|
|||
/// using (handle)
|
|||
/// {
|
|||
/// // protected work
|
|||
/// }
|
|||
/// }
|
|||
/// </code>
|
|||
/// </example>
|
|||
public static async Task<IDisposable?> TryLockAsync(object key) |
|||
{ |
|||
Check.NotNull(key, nameof(key)); |
|||
return await TryLockAsync(key, default, CancellationToken.None); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Attempts to acquire an exclusive lock for the specified <paramref name="key"/>, waiting up to <paramref name="timeout"/>.
|
|||
/// </summary>
|
|||
/// <param name="key">A non-null object that identifies the lock.</param>
|
|||
/// <param name="timeout">Maximum time to wait for the lock. If set to <see cref="TimeSpan.Zero"/>, the method performs an immediate, non-blocking attempt.</param>
|
|||
/// <param name="cancellationToken">A token to cancel the wait.</param>
|
|||
/// <returns>
|
|||
/// An <see cref="IDisposable"/> handle if the lock was acquired within the timeout; otherwise <see langword="null"/>.
|
|||
/// </returns>
|
|||
/// <exception cref="ArgumentNullException">Thrown when <paramref name="key"/> is <see langword="null"/>.</exception>
|
|||
/// <exception cref="OperationCanceledException">Thrown if the wait is canceled via <paramref name="cancellationToken"/>.</exception>
|
|||
/// <example>
|
|||
/// <code>
|
|||
/// var handle = await KeyedLock.TryLockAsync("send-mail", TimeSpan.FromSeconds(1));
|
|||
/// if (handle != null)
|
|||
/// {
|
|||
/// using (handle)
|
|||
/// {
|
|||
/// // protected work
|
|||
/// }
|
|||
/// }
|
|||
/// else
|
|||
/// {
|
|||
/// // lock not acquired within timeout
|
|||
/// }
|
|||
/// </code>
|
|||
/// </example>
|
|||
public static async Task<IDisposable?> TryLockAsync(object key, TimeSpan timeout, CancellationToken cancellationToken = default) |
|||
{ |
|||
Check.NotNull(key, nameof(key)); |
|||
var semaphore = GetOrCreate(key); |
|||
bool acquired; |
|||
try |
|||
{ |
|||
if (timeout == default) |
|||
{ |
|||
acquired = await semaphore.WaitAsync(0, cancellationToken); |
|||
} |
|||
else |
|||
{ |
|||
acquired = await semaphore.WaitAsync(timeout, cancellationToken); |
|||
} |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
var toDispose = DecrementRefAndMaybeRemove(key); |
|||
toDispose?.Dispose(); |
|||
throw; |
|||
} |
|||
|
|||
if (acquired) |
|||
{ |
|||
return new Releaser(key); |
|||
} |
|||
|
|||
var toDisposeOnFail = DecrementRefAndMaybeRemove(key); |
|||
toDisposeOnFail?.Dispose(); |
|||
|
|||
return null; |
|||
} |
|||
|
|||
private static SemaphoreSlim GetOrCreate(object key) |
|||
{ |
|||
RefCounted<SemaphoreSlim> item; |
|||
lock (SemaphoreSlims) |
|||
{ |
|||
if (SemaphoreSlims.TryGetValue(key, out item!)) |
|||
{ |
|||
++item.RefCount; |
|||
} |
|||
else |
|||
{ |
|||
item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1)); |
|||
SemaphoreSlims[key] = item; |
|||
} |
|||
} |
|||
return item.Value; |
|||
} |
|||
|
|||
private sealed class RefCounted<T>(T value) |
|||
{ |
|||
public int RefCount { get; set; } = 1; |
|||
|
|||
public T Value { get; } = value; |
|||
} |
|||
|
|||
private sealed class Releaser(object key) : IDisposable |
|||
{ |
|||
private int _disposed; |
|||
|
|||
public void Dispose() |
|||
{ |
|||
if (Interlocked.Exchange(ref _disposed, 1) == 1) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
RefCounted<SemaphoreSlim> item; |
|||
var shouldDispose = false; |
|||
lock (SemaphoreSlims) |
|||
{ |
|||
if (!SemaphoreSlims.TryGetValue(key, out item!)) |
|||
{ |
|||
return; |
|||
} |
|||
--item.RefCount; |
|||
if (item.RefCount == 0) |
|||
{ |
|||
SemaphoreSlims.Remove(key); |
|||
shouldDispose = true; |
|||
} |
|||
} |
|||
|
|||
if (shouldDispose) |
|||
{ |
|||
item.Value.Dispose(); |
|||
} |
|||
else |
|||
{ |
|||
item.Value.Release(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private static SemaphoreSlim? DecrementRefAndMaybeRemove(object key) |
|||
{ |
|||
RefCounted<SemaphoreSlim>? itemToDispose = null; |
|||
lock (SemaphoreSlims) |
|||
{ |
|||
if (SemaphoreSlims.TryGetValue(key, out var item)) |
|||
{ |
|||
--item.RefCount; |
|||
if (item.RefCount == 0) |
|||
{ |
|||
SemaphoreSlims.Remove(key); |
|||
itemToDispose = item; |
|||
} |
|||
} |
|||
} |
|||
return itemToDispose?.Value; |
|||
} |
|||
} |
|||
@ -1,21 +1,20 @@ |
|||
using System.Threading; |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.DistributedLocking |
|||
namespace Volo.Abp.DistributedLocking; |
|||
|
|||
public class LocalAbpDistributedLockHandle : IAbpDistributedLockHandle |
|||
{ |
|||
public class LocalAbpDistributedLockHandle : IAbpDistributedLockHandle |
|||
{ |
|||
private readonly SemaphoreSlim _semaphore; |
|||
private readonly IDisposable _disposable; |
|||
|
|||
public LocalAbpDistributedLockHandle(SemaphoreSlim semaphore) |
|||
{ |
|||
_semaphore = semaphore; |
|||
} |
|||
public LocalAbpDistributedLockHandle(IDisposable disposable) |
|||
{ |
|||
_disposable = disposable; |
|||
} |
|||
|
|||
public ValueTask DisposeAsync() |
|||
{ |
|||
_semaphore.Release(); |
|||
return default; |
|||
} |
|||
public ValueTask DisposeAsync() |
|||
{ |
|||
_disposable.Dispose(); |
|||
return default; |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,17 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.DistributedLocking; |
|||
|
|||
/// <summary>
|
|||
/// This implementation of <see cref="IAbpDistributedLock"/> does not provide any distributed locking functionality.
|
|||
/// Useful in scenarios where distributed locking is not required or during testing.
|
|||
/// </summary>
|
|||
public class NullAbpDistributedLock : IAbpDistributedLock |
|||
{ |
|||
public Task<IAbpDistributedLockHandle?> TryAcquireAsync(string name, TimeSpan timeout = default, CancellationToken cancellationToken = default) |
|||
{ |
|||
return Task.FromResult<IAbpDistributedLockHandle?>(new LocalAbpDistributedLockHandle(NullDisposable.Instance)); |
|||
} |
|||
} |
|||
@ -0,0 +1,179 @@ |
|||
using System; |
|||
using System.Diagnostics; |
|||
using System.Threading; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using Shouldly; |
|||
using Xunit; |
|||
|
|||
namespace Volo.Abp.Threading; |
|||
|
|||
public class KeyedLock_Tests |
|||
{ |
|||
[Fact] |
|||
public async Task TryLock_Should_Acquire_Immediately_When_Free() |
|||
{ |
|||
var key = "key-try-1"; |
|||
var handle = await KeyedLock.TryLockAsync(key); |
|||
handle.ShouldNotBeNull(); |
|||
handle!.Dispose(); |
|||
|
|||
var handle2 = await KeyedLock.TryLockAsync(key); |
|||
handle2.ShouldNotBeNull(); |
|||
handle2!.Dispose(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task TryLock_Should_Return_Null_When_Already_Locked() |
|||
{ |
|||
var key = "key-try-2"; |
|||
using (await KeyedLock.LockAsync(key)) |
|||
{ |
|||
var handle2 = await KeyedLock.TryLockAsync(key); |
|||
handle2.ShouldBeNull(); |
|||
} |
|||
|
|||
var handle3 = await KeyedLock.TryLockAsync(key); |
|||
handle3.ShouldNotBeNull(); |
|||
handle3!.Dispose(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task LockAsync_Should_Block_Until_Released() |
|||
{ |
|||
var key = "key-block-1"; |
|||
var sw = Stopwatch.StartNew(); |
|||
|
|||
Task inner; |
|||
using (await KeyedLock.LockAsync(key)) |
|||
{ |
|||
inner = Task.Run(async () => |
|||
{ |
|||
using (await KeyedLock.LockAsync(key)) |
|||
{ |
|||
// Acquired only after outer lock is released
|
|||
} |
|||
}); |
|||
|
|||
// While holding the outer lock, inner waiter should not complete
|
|||
await Task.Delay(200); |
|||
inner.IsCompleted.ShouldBeFalse(); |
|||
} |
|||
|
|||
// After releasing, inner should complete; elapsed >= hold time
|
|||
await inner; |
|||
sw.ElapsedMilliseconds.ShouldBeGreaterThanOrEqualTo(180); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task TryLock_With_Timeout_Should_Return_Null_When_Busy() |
|||
{ |
|||
var key = "key-timeout-1"; |
|||
using (await KeyedLock.LockAsync(key)) |
|||
{ |
|||
var handle = await KeyedLock.TryLockAsync(key, TimeSpan.FromMilliseconds(50)); |
|||
handle.ShouldBeNull(); |
|||
} |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task TryLock_With_Timeout_Should_Succeed_If_Released_In_Time() |
|||
{ |
|||
var key = "key-timeout-2"; |
|||
// Hold the lock manually
|
|||
var outer = await KeyedLock.LockAsync(key); |
|||
var tryTask = KeyedLock.TryLockAsync(key, TimeSpan.FromMilliseconds(200)); |
|||
await Task.Delay(50); |
|||
// Release within the timeout window
|
|||
outer.Dispose(); |
|||
var handle2 = await tryTask; |
|||
handle2.ShouldNotBeNull(); |
|||
handle2!.Dispose(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task LockAsync_With_Cancellation_Should_Rollback_RefCount() |
|||
{ |
|||
var key = "key-cancel-1"; |
|||
var cts = new CancellationTokenSource(); |
|||
await cts.CancelAsync(); |
|||
await Should.ThrowAsync<OperationCanceledException>(async () => |
|||
{ |
|||
await KeyedLock.LockAsync(key, cts.Token); |
|||
}); |
|||
|
|||
// After cancellation, we should still be able to acquire the key
|
|||
var handle = await KeyedLock.TryLockAsync(key); |
|||
handle.ShouldNotBeNull(); |
|||
handle!.Dispose(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task TryLock_With_Cancellation_Should_Rollback() |
|||
{ |
|||
var key = "key-cancel-2"; |
|||
// Ensure it's initially free
|
|||
var h0 = await KeyedLock.TryLockAsync(key); |
|||
h0?.Dispose(); |
|||
|
|||
var cts = new CancellationTokenSource(); |
|||
await cts.CancelAsync(); |
|||
await Should.ThrowAsync<OperationCanceledException>(async () => |
|||
{ |
|||
await KeyedLock.TryLockAsync(key, TimeSpan.FromMilliseconds(200), cts.Token); |
|||
}); |
|||
|
|||
// After cancellation, the key should be acquirable
|
|||
var handle = await KeyedLock.TryLockAsync(key); |
|||
handle.ShouldNotBeNull(); |
|||
handle!.Dispose(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Serializes_Access_For_Same_Key() |
|||
{ |
|||
var key = "key-serial-1"; |
|||
int counter = 0; |
|||
var tasks = Enumerable.Range(0, 10).Select(async _ => |
|||
{ |
|||
using (await KeyedLock.LockAsync(key)) |
|||
{ |
|||
var current = counter; |
|||
await Task.Delay(10); |
|||
counter = current + 1; |
|||
} |
|||
}); |
|||
|
|||
await Task.WhenAll(tasks); |
|||
counter.ShouldBe(10); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Multiple_Keys_Should_Not_Block_Each_Other() |
|||
{ |
|||
var key1 = "key-multi-1"; |
|||
var key2 = "key-multi-2"; |
|||
|
|||
using (await KeyedLock.LockAsync(key1)) |
|||
{ |
|||
var handle2 = await KeyedLock.TryLockAsync(key2); |
|||
handle2.ShouldNotBeNull(); |
|||
handle2!.Dispose(); |
|||
} |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task TryLock_Default_Overload_Delegates_To_Full_Overload() |
|||
{ |
|||
var key = "key-default-1"; |
|||
using (await KeyedLock.LockAsync(key)) |
|||
{ |
|||
var h1 = await KeyedLock.TryLockAsync(key); |
|||
h1.ShouldBeNull(); |
|||
} |
|||
|
|||
var h2 = await KeyedLock.TryLockAsync(key); |
|||
h2.ShouldNotBeNull(); |
|||
h2!.Dispose(); |
|||
} |
|||
} |
|||
Loading…
Reference in new issue