diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs index 096ba83a3a..74b41dea23 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.DependencyInjection; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; using Volo.Abp.Json; using Volo.Abp.Modularity; using Volo.Abp.Threading; @@ -26,12 +27,17 @@ public class AbpRabbitMqModule : AbpModule public override void OnApplicationShutdown(ApplicationShutdownContext context) { - context.ServiceProvider + AsyncHelper.RunSync(() => OnApplicationShutdownAsync(context)); + } + + public async override Task OnApplicationShutdownAsync(ApplicationShutdownContext context) + { + await context.ServiceProvider .GetRequiredService() - .Dispose(); + .DisposeAsync(); - context.ServiceProvider + await context.ServiceProvider .GetRequiredService() - .Dispose(); + .DisposeAsync(); } } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs index bbb594a5f7..03d6029b69 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs @@ -65,7 +65,7 @@ public class ChannelPool : IChannelPool, ISingletonDependency if (poolItem.Channel.IsClosed) { - poolItem.Dispose(); + await poolItem.DisposeAsync(); Channels.TryRemove(channelName, out _); using (await Semaphore.LockAsync()) @@ -106,7 +106,7 @@ public class ChannelPool : IChannelPool, ISingletonDependency } } - public void Dispose() + public async ValueTask DisposeAsync() { if (IsDisposed) { @@ -134,10 +134,12 @@ public class ChannelPool : IChannelPool, ISingletonDependency try { poolItem.WaitIfInUse(remainingWaitDuration); - poolItem.Dispose(); + await poolItem.DisposeAsync(); } catch - { } + { + // ignored + } poolItemDisposeStopwatch.Stop(); @@ -158,7 +160,7 @@ public class ChannelPool : IChannelPool, ISingletonDependency Channels.Clear(); } - protected class ChannelPoolItem : IDisposable + protected class ChannelPoolItem : IAsyncDisposable { public IChannel Channel { get; } @@ -208,9 +210,9 @@ public class ChannelPool : IChannelPool, ISingletonDependency } } - public void Dispose() + public async ValueTask DisposeAsync() { - Channel.Dispose(); + await Channel.DisposeAsync(); } } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs index 938b0e4f8c..888de64613 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs @@ -27,48 +27,25 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency public virtual async Task GetAsync(string? connectionName = null) { - connectionName ??= RabbitMqConnections.DefaultConnectionName; - - IConnection connection; - - if (Connections.TryGetValue(connectionName, out var existingConnection)) + using (await Semaphore.LockAsync()) { - connection = existingConnection; - } - else - { - using (await Semaphore.LockAsync()) + connectionName ??= RabbitMqConnections.DefaultConnectionName; + + if (Connections.TryGetValue(connectionName, out var existingConnection) && existingConnection.IsOpen) { - try - { - var connectionFactory = Options.Connections.GetOrDefault(connectionName); - if (Connections.TryGetValue(connectionName, out var existingConnection2)) - { - connection = existingConnection2; - } - else - { - connection = await GetConnectionAsync(connectionName, connectionFactory); - Connections.TryAdd(connectionName, connection); + return existingConnection; + } - if (!connection.IsOpen) - { - connection.Dispose(); - Connections.TryRemove(connectionName, out _); - connection = await GetConnectionAsync(connectionName, connectionFactory); - Connections.TryAdd(connectionName, connection); - } - } - } - catch (Exception) - { - Connections.TryRemove(connectionName, out _); - throw; - } + if(existingConnection != null) + { + await existingConnection.DisposeAsync(); } - } - return connection; + var connectionFactory = Options.Connections.GetOrDefault(connectionName); + var connection = await GetConnectionAsync(connectionName, connectionFactory); + Connections[connectionName] = connection; + return connection; + } } protected virtual async Task GetConnectionAsync(string connectionName, ConnectionFactory connectionFactory) @@ -80,7 +57,7 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency : await connectionFactory.CreateConnectionAsync(hostnames); } - public void Dispose() + public async ValueTask DisposeAsync() { if (_isDisposed) { @@ -93,7 +70,7 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency { try { - connection.Dispose(); + await connection.DisposeAsync(); } catch { diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IChannelPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IChannelPool.cs index 06b1cc0ba6..5a9611ad86 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IChannelPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IChannelPool.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; namespace Volo.Abp.RabbitMQ; -public interface IChannelPool : IDisposable +public interface IChannelPool : IAsyncDisposable { Task AcquireAsync(string? channelName = null, string? connectionName = null); } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs index cca44eea89..02fdf286d9 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs @@ -4,7 +4,7 @@ using RabbitMQ.Client; namespace Volo.Abp.RabbitMQ; -public interface IConnectionPool : IDisposable +public interface IConnectionPool : IAsyncDisposable { Task GetAsync(string? connectionName = null); }