Browse Source

Make sure the `existingConnection` is in the `Open` state and use `IAsyncDisposable`.

pull/22579/head
maliming 1 year ago
parent
commit
773cdb26b2
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 16
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs
  2. 16
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs
  3. 55
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs
  4. 2
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IChannelPool.cs
  5. 2
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs

16
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs

@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Json;
using Volo.Abp.Modularity;
using Volo.Abp.Threading;
@ -26,12 +27,17 @@ public class AbpRabbitMqModule : AbpModule
public override void OnApplicationShutdown(ApplicationShutdownContext context)
{
context.ServiceProvider
AsyncHelper.RunSync(() => OnApplicationShutdownAsync(context));
}
public async override Task OnApplicationShutdownAsync(ApplicationShutdownContext context)
{
await context.ServiceProvider
.GetRequiredService<IChannelPool>()
.Dispose();
.DisposeAsync();
context.ServiceProvider
await context.ServiceProvider
.GetRequiredService<IConnectionPool>()
.Dispose();
.DisposeAsync();
}
}

16
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs

@ -65,7 +65,7 @@ public class ChannelPool : IChannelPool, ISingletonDependency
if (poolItem.Channel.IsClosed)
{
poolItem.Dispose();
await poolItem.DisposeAsync();
Channels.TryRemove(channelName, out _);
using (await Semaphore.LockAsync())
@ -106,7 +106,7 @@ public class ChannelPool : IChannelPool, ISingletonDependency
}
}
public void Dispose()
public async ValueTask DisposeAsync()
{
if (IsDisposed)
{
@ -134,10 +134,12 @@ public class ChannelPool : IChannelPool, ISingletonDependency
try
{
poolItem.WaitIfInUse(remainingWaitDuration);
poolItem.Dispose();
await poolItem.DisposeAsync();
}
catch
{ }
{
// ignored
}
poolItemDisposeStopwatch.Stop();
@ -158,7 +160,7 @@ public class ChannelPool : IChannelPool, ISingletonDependency
Channels.Clear();
}
protected class ChannelPoolItem : IDisposable
protected class ChannelPoolItem : IAsyncDisposable
{
public IChannel Channel { get; }
@ -208,9 +210,9 @@ public class ChannelPool : IChannelPool, ISingletonDependency
}
}
public void Dispose()
public async ValueTask DisposeAsync()
{
Channel.Dispose();
await Channel.DisposeAsync();
}
}

55
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs

@ -27,48 +27,25 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency
public virtual async Task<IConnection> GetAsync(string? connectionName = null)
{
connectionName ??= RabbitMqConnections.DefaultConnectionName;
IConnection connection;
if (Connections.TryGetValue(connectionName, out var existingConnection))
using (await Semaphore.LockAsync())
{
connection = existingConnection;
}
else
{
using (await Semaphore.LockAsync())
connectionName ??= RabbitMqConnections.DefaultConnectionName;
if (Connections.TryGetValue(connectionName, out var existingConnection) && existingConnection.IsOpen)
{
try
{
var connectionFactory = Options.Connections.GetOrDefault(connectionName);
if (Connections.TryGetValue(connectionName, out var existingConnection2))
{
connection = existingConnection2;
}
else
{
connection = await GetConnectionAsync(connectionName, connectionFactory);
Connections.TryAdd(connectionName, connection);
return existingConnection;
}
if (!connection.IsOpen)
{
connection.Dispose();
Connections.TryRemove(connectionName, out _);
connection = await GetConnectionAsync(connectionName, connectionFactory);
Connections.TryAdd(connectionName, connection);
}
}
}
catch (Exception)
{
Connections.TryRemove(connectionName, out _);
throw;
}
if(existingConnection != null)
{
await existingConnection.DisposeAsync();
}
}
return connection;
var connectionFactory = Options.Connections.GetOrDefault(connectionName);
var connection = await GetConnectionAsync(connectionName, connectionFactory);
Connections[connectionName] = connection;
return connection;
}
}
protected virtual async Task<IConnection> GetConnectionAsync(string connectionName, ConnectionFactory connectionFactory)
@ -80,7 +57,7 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency
: await connectionFactory.CreateConnectionAsync(hostnames);
}
public void Dispose()
public async ValueTask DisposeAsync()
{
if (_isDisposed)
{
@ -93,7 +70,7 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency
{
try
{
connection.Dispose();
await connection.DisposeAsync();
}
catch
{

2
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IChannelPool.cs

@ -3,7 +3,7 @@ using System.Threading.Tasks;
namespace Volo.Abp.RabbitMQ;
public interface IChannelPool : IDisposable
public interface IChannelPool : IAsyncDisposable
{
Task<IChannelAccessor> AcquireAsync(string? channelName = null, string? connectionName = null);
}

2
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs

@ -4,7 +4,7 @@ using RabbitMQ.Client;
namespace Volo.Abp.RabbitMQ;
public interface IConnectionPool : IDisposable
public interface IConnectionPool : IAsyncDisposable
{
Task<IConnection> GetAsync(string? connectionName = null);
}

Loading…
Cancel
Save