From e45259a90c24ec539e2aa6eefd344595631b87ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halil=20=C4=B0brahim=20Kalkan?= Date: Thu, 14 Oct 2021 11:00:05 +0300 Subject: [PATCH] Implemented MedallionAbpDistributedLock --- .../DistributedLocking/IAbpDistributedLock.cs | 2 +- .../LocalAbpDistributedLock.cs | 2 ++ .../AbpDistributedLockHandleExtensions.cs | 14 ++++++++ .../MedallionAbpDistributedLock.cs | 35 +++++++++++++++++++ .../MedallionAbpDistributedLockHandle.cs | 20 +++++++++++ .../Volo/Abp/EventBus/Boxes/InboxProcessor.cs | 15 ++++---- .../Volo/Abp/EventBus/Boxes/OutboxSender.cs | 17 +++++---- 7 files changed, 91 insertions(+), 14 deletions(-) create mode 100644 framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/AbpDistributedLockHandleExtensions.cs create mode 100644 framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLock.cs create mode 100644 framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLockHandle.cs diff --git a/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/IAbpDistributedLock.cs b/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/IAbpDistributedLock.cs index 15f8921577..6619ea337c 100644 --- a/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/IAbpDistributedLock.cs +++ b/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/IAbpDistributedLock.cs @@ -18,7 +18,7 @@ namespace Volo.Abp.DistributedLocking /// Cancellation token [ItemCanBeNull] Task TryAcquireAsync( - string name, + [NotNull] string name, TimeSpan timeout = default, CancellationToken cancellationToken = default ); diff --git a/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLock.cs b/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLock.cs index 7a34106b0d..4b481d3549 100644 --- a/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLock.cs +++ b/framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLock.cs @@ -15,6 +15,8 @@ namespace Volo.Abp.DistributedLocking TimeSpan timeout = default, CancellationToken cancellationToken = default) { + Check.NotNullOrWhiteSpace(name, nameof(name)); + var semaphore = _localSyncObjects.GetOrAdd(name, _ => new SemaphoreSlim(1, 1)); if (!await semaphore.WaitAsync(timeout, cancellationToken)) diff --git a/framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/AbpDistributedLockHandleExtensions.cs b/framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/AbpDistributedLockHandleExtensions.cs new file mode 100644 index 0000000000..f0b69116f5 --- /dev/null +++ b/framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/AbpDistributedLockHandleExtensions.cs @@ -0,0 +1,14 @@ +using System; +using Medallion.Threading; + +namespace Volo.Abp.DistributedLocking +{ + public static class AbpDistributedLockHandleExtensions + { + public static IDistributedSynchronizationHandle ToDistributedSynchronizationHandle( + this IAbpDistributedLockHandle handle) + { + return handle.As().Handle; + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLock.cs b/framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLock.cs new file mode 100644 index 0000000000..4b46096da9 --- /dev/null +++ b/framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLock.cs @@ -0,0 +1,35 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Medallion.Threading; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.DistributedLocking +{ + [Dependency(ReplaceServices = true)] + public class MedallionAbpDistributedLock : IAbpDistributedLock, ITransientDependency + { + protected IDistributedLockProvider DistributedLockProvider { get; } + + public MedallionAbpDistributedLock(IDistributedLockProvider distributedLockProvider) + { + DistributedLockProvider = distributedLockProvider; + } + + public async Task TryAcquireAsync( + string name, + TimeSpan timeout = default, + CancellationToken cancellationToken = default) + { + Check.NotNullOrWhiteSpace(name, nameof(name)); + + var handle = await DistributedLockProvider.TryAcquireLockAsync(name, timeout, cancellationToken); + if (handle == null) + { + return null; + } + + return new MedallionAbpDistributedLockHandle(handle); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLockHandle.cs b/framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLockHandle.cs new file mode 100644 index 0000000000..d22371b862 --- /dev/null +++ b/framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLockHandle.cs @@ -0,0 +1,20 @@ +using System.Threading.Tasks; +using Medallion.Threading; + +namespace Volo.Abp.DistributedLocking +{ + public class MedallionAbpDistributedLockHandle : IAbpDistributedLockHandle + { + public IDistributedSynchronizationHandle Handle { get; } + + public MedallionAbpDistributedLockHandle(IDistributedSynchronizationHandle handle) + { + Handle = handle; + } + + public ValueTask DisposeAsync() + { + return Handle.DisposeAsync(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs index e3c71771ad..f381ca9644 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; +using Volo.Abp.DistributedLocking; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Threading; using Volo.Abp.Timing; @@ -19,7 +20,7 @@ namespace Volo.Abp.EventBus.Boxes protected IServiceProvider ServiceProvider { get; } protected AbpAsyncTimer Timer { get; } protected IDistributedEventBus DistributedEventBus { get; } - protected IDistributedLockProvider DistributedLockProvider { get; } + protected IAbpDistributedLock DistributedLock { get; } protected IUnitOfWorkManager UnitOfWorkManager { get; } protected IClock Clock { get; } protected IEventInbox Inbox { get; private set; } @@ -37,7 +38,7 @@ namespace Volo.Abp.EventBus.Boxes IServiceProvider serviceProvider, AbpAsyncTimer timer, IDistributedEventBus distributedEventBus, - IDistributedLockProvider distributedLockProvider, + IAbpDistributedLock distributedLock, IUnitOfWorkManager unitOfWorkManager, IClock clock, IOptions eventBusBoxesOptions) @@ -45,11 +46,11 @@ namespace Volo.Abp.EventBus.Boxes ServiceProvider = serviceProvider; Timer = timer; DistributedEventBus = distributedEventBus; - DistributedLockProvider = distributedLockProvider; + DistributedLock = distributedLock; UnitOfWorkManager = unitOfWorkManager; Clock = clock; EventBusBoxesOptions = eventBusBoxesOptions.Value; - Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds; + Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.TotalMilliseconds); Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; StoppingTokenSource = new CancellationTokenSource(); @@ -84,7 +85,7 @@ namespace Volo.Abp.EventBus.Boxes return; } - await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken)) + await using (var handle = await DistributedLock.TryAcquireAsync(DistributedLockName, cancellationToken: StoppingToken)) { if (handle != null) { @@ -120,7 +121,9 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken); + await TaskDelayHelper.DelayAsync( + Convert.ToInt32(EventBusBoxesOptions.DistributedLockWaitDuration.TotalMilliseconds), + StoppingToken); } } } diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs index 32545227c3..e9e65118a3 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; +using Volo.Abp.DistributedLocking; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Threading; @@ -17,7 +18,7 @@ namespace Volo.Abp.EventBus.Boxes protected IServiceProvider ServiceProvider { get; } protected AbpAsyncTimer Timer { get; } protected IDistributedEventBus DistributedEventBus { get; } - protected IDistributedLockProvider DistributedLockProvider { get; } + protected IAbpDistributedLock DistributedLock { get; } protected IEventOutbox Outbox { get; private set; } protected OutboxConfig OutboxConfig { get; private set; } protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } @@ -31,15 +32,15 @@ namespace Volo.Abp.EventBus.Boxes IServiceProvider serviceProvider, AbpAsyncTimer timer, IDistributedEventBus distributedEventBus, - IDistributedLockProvider distributedLockProvider, + IAbpDistributedLock distributedLock, IOptions eventBusBoxesOptions) { ServiceProvider = serviceProvider; - Timer = timer; DistributedEventBus = distributedEventBus; - DistributedLockProvider = distributedLockProvider; + DistributedLock = distributedLock; EventBusBoxesOptions = eventBusBoxesOptions.Value; - Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds; + Timer = timer; + Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.TotalMilliseconds); Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; StoppingTokenSource = new CancellationTokenSource(); @@ -69,7 +70,7 @@ namespace Volo.Abp.EventBus.Boxes protected virtual async Task RunAsync() { - await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken)) + await using (var handle = await DistributedLock.TryAcquireAsync(DistributedLockName, cancellationToken: StoppingToken)) { if (handle != null) { @@ -100,7 +101,9 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken); + await TaskDelayHelper.DelayAsync( + Convert.ToInt32(EventBusBoxesOptions.DistributedLockWaitDuration.TotalMilliseconds), + StoppingToken); } } }