Browse Source

Implemented MedallionAbpDistributedLock

pull/10325/head
Halil İbrahim Kalkan 4 years ago
parent
commit
e45259a90c
  1. 2
      framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/IAbpDistributedLock.cs
  2. 2
      framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLock.cs
  3. 14
      framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/AbpDistributedLockHandleExtensions.cs
  4. 35
      framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLock.cs
  5. 20
      framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLockHandle.cs
  6. 15
      framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs
  7. 17
      framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs

2
framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/IAbpDistributedLock.cs

@ -18,7 +18,7 @@ namespace Volo.Abp.DistributedLocking
/// <param name="cancellationToken">Cancellation token</param>
[ItemCanBeNull]
Task<IAbpDistributedLockHandle> TryAcquireAsync(
string name,
[NotNull] string name,
TimeSpan timeout = default,
CancellationToken cancellationToken = default
);

2
framework/src/Volo.Abp.DistributedLocking.Abstractions/Volo/Abp/DistributedLocking/LocalAbpDistributedLock.cs

@ -15,6 +15,8 @@ namespace Volo.Abp.DistributedLocking
TimeSpan timeout = default,
CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(name, nameof(name));
var semaphore = _localSyncObjects.GetOrAdd(name, _ => new SemaphoreSlim(1, 1));
if (!await semaphore.WaitAsync(timeout, cancellationToken))

14
framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/AbpDistributedLockHandleExtensions.cs

@ -0,0 +1,14 @@
using System;
using Medallion.Threading;
namespace Volo.Abp.DistributedLocking
{
public static class AbpDistributedLockHandleExtensions
{
public static IDistributedSynchronizationHandle ToDistributedSynchronizationHandle(
this IAbpDistributedLockHandle handle)
{
return handle.As<MedallionAbpDistributedLockHandle>().Handle;
}
}
}

35
framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLock.cs

@ -0,0 +1,35 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Medallion.Threading;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.DistributedLocking
{
[Dependency(ReplaceServices = true)]
public class MedallionAbpDistributedLock : IAbpDistributedLock, ITransientDependency
{
protected IDistributedLockProvider DistributedLockProvider { get; }
public MedallionAbpDistributedLock(IDistributedLockProvider distributedLockProvider)
{
DistributedLockProvider = distributedLockProvider;
}
public async Task<IAbpDistributedLockHandle> TryAcquireAsync(
string name,
TimeSpan timeout = default,
CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(name, nameof(name));
var handle = await DistributedLockProvider.TryAcquireLockAsync(name, timeout, cancellationToken);
if (handle == null)
{
return null;
}
return new MedallionAbpDistributedLockHandle(handle);
}
}
}

20
framework/src/Volo.Abp.DistributedLocking/Volo/Abp/DistributedLocking/MedallionAbpDistributedLockHandle.cs

@ -0,0 +1,20 @@
using System.Threading.Tasks;
using Medallion.Threading;
namespace Volo.Abp.DistributedLocking
{
public class MedallionAbpDistributedLockHandle : IAbpDistributedLockHandle
{
public IDistributedSynchronizationHandle Handle { get; }
public MedallionAbpDistributedLockHandle(IDistributedSynchronizationHandle handle)
{
Handle = handle;
}
public ValueTask DisposeAsync()
{
return Handle.DisposeAsync();
}
}
}

15
framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs

@ -7,6 +7,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DistributedLocking;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Threading;
using Volo.Abp.Timing;
@ -19,7 +20,7 @@ namespace Volo.Abp.EventBus.Boxes
protected IServiceProvider ServiceProvider { get; }
protected AbpAsyncTimer Timer { get; }
protected IDistributedEventBus DistributedEventBus { get; }
protected IDistributedLockProvider DistributedLockProvider { get; }
protected IAbpDistributedLock DistributedLock { get; }
protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IClock Clock { get; }
protected IEventInbox Inbox { get; private set; }
@ -37,7 +38,7 @@ namespace Volo.Abp.EventBus.Boxes
IServiceProvider serviceProvider,
AbpAsyncTimer timer,
IDistributedEventBus distributedEventBus,
IDistributedLockProvider distributedLockProvider,
IAbpDistributedLock distributedLock,
IUnitOfWorkManager unitOfWorkManager,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
@ -45,11 +46,11 @@ namespace Volo.Abp.EventBus.Boxes
ServiceProvider = serviceProvider;
Timer = timer;
DistributedEventBus = distributedEventBus;
DistributedLockProvider = distributedLockProvider;
DistributedLock = distributedLock;
UnitOfWorkManager = unitOfWorkManager;
Clock = clock;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds;
Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.TotalMilliseconds);
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<InboxProcessor>.Instance;
StoppingTokenSource = new CancellationTokenSource();
@ -84,7 +85,7 @@ namespace Volo.Abp.EventBus.Boxes
return;
}
await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken))
await using (var handle = await DistributedLock.TryAcquireAsync(DistributedLockName, cancellationToken: StoppingToken))
{
if (handle != null)
{
@ -120,7 +121,9 @@ namespace Volo.Abp.EventBus.Boxes
else
{
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName);
await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken);
await TaskDelayHelper.DelayAsync(
Convert.ToInt32(EventBusBoxesOptions.DistributedLockWaitDuration.TotalMilliseconds),
StoppingToken);
}
}
}

17
framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs

@ -7,6 +7,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DistributedLocking;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Threading;
@ -17,7 +18,7 @@ namespace Volo.Abp.EventBus.Boxes
protected IServiceProvider ServiceProvider { get; }
protected AbpAsyncTimer Timer { get; }
protected IDistributedEventBus DistributedEventBus { get; }
protected IDistributedLockProvider DistributedLockProvider { get; }
protected IAbpDistributedLock DistributedLock { get; }
protected IEventOutbox Outbox { get; private set; }
protected OutboxConfig OutboxConfig { get; private set; }
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
@ -31,15 +32,15 @@ namespace Volo.Abp.EventBus.Boxes
IServiceProvider serviceProvider,
AbpAsyncTimer timer,
IDistributedEventBus distributedEventBus,
IDistributedLockProvider distributedLockProvider,
IAbpDistributedLock distributedLock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
{
ServiceProvider = serviceProvider;
Timer = timer;
DistributedEventBus = distributedEventBus;
DistributedLockProvider = distributedLockProvider;
DistributedLock = distributedLock;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds;
Timer = timer;
Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.TotalMilliseconds);
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<OutboxSender>.Instance;
StoppingTokenSource = new CancellationTokenSource();
@ -69,7 +70,7 @@ namespace Volo.Abp.EventBus.Boxes
protected virtual async Task RunAsync()
{
await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken))
await using (var handle = await DistributedLock.TryAcquireAsync(DistributedLockName, cancellationToken: StoppingToken))
{
if (handle != null)
{
@ -100,7 +101,9 @@ namespace Volo.Abp.EventBus.Boxes
else
{
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName);
await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken);
await TaskDelayHelper.DelayAsync(
Convert.ToInt32(EventBusBoxesOptions.DistributedLockWaitDuration.TotalMilliseconds),
StoppingToken);
}
}
}

Loading…
Cancel
Save