diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs index cf88778089..1dbe4a6dcc 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs @@ -122,7 +122,7 @@ public class JobQueue : IJobQueue protected virtual Task EnsureInitializedAsync() { - if (ChannelAccessor != null) + if (ChannelAccessor != null && ChannelAccessor.Channel.IsOpen) { return Task.CompletedTask; } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs index 60eecd0cd2..da010b83c9 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs @@ -20,6 +20,7 @@ public class AbpRabbitMqModule : AbpModule foreach (var connectionFactory in options.Connections.Values) { connectionFactory.DispatchConsumersAsync = true; + connectionFactory.AutomaticRecoveryEnabled = false; } }); } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs index d795342c1d..b7ed4c80c8 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs @@ -42,6 +42,18 @@ public class ChannelPool : IChannelPool, ISingletonDependency poolItem.Acquire(); + if (poolItem.Channel.IsClosed) + { + poolItem.Dispose(); + Channels.TryRemove(channelName, out _); + poolItem = Channels.GetOrAdd( + channelName, + _ => new ChannelPoolItem(CreateChannel(channelName, connectionName)) + ); + + poolItem.Acquire(); + } + return new ChannelAccessor( poolItem.Channel, channelName, diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs index 8c046ffffd..5f4556b96a 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs @@ -24,21 +24,19 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency public virtual IConnection Get(string? connectionName = null) { connectionName ??= RabbitMqConnections.DefaultConnectionName; - + var connectionFactory = Options.Connections.GetOrDefault(connectionName); try { - var lazyConnection = Connections.GetOrAdd( - connectionName, () => new Lazy(() => - { - var connection = Options.Connections.GetOrDefault(connectionName); - var hostnames = connection.HostName.TrimEnd(';').Split(';'); - // Handle Rabbit MQ Cluster. - return hostnames.Length == 1 ? connection.CreateConnection() : connection.CreateConnection(hostnames); - - }) - ); - - return lazyConnection.Value; + var connection = GetConnection(connectionName, connectionFactory); + + if (connection.IsOpen) + { + return connection; + } + + connection.Dispose(); + Connections.TryRemove(connectionName, out _); + return GetConnection(connectionName, connectionFactory); } catch (Exception) { @@ -47,6 +45,20 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency } } + protected virtual IConnection GetConnection(string connectionName, ConnectionFactory connectionFactory) + { + return Connections.GetOrAdd( + connectionName, () => new Lazy(() => + { + var hostnames = connectionFactory.HostName.TrimEnd(';').Split(';'); + // Handle Rabbit MQ Cluster. + return hostnames.Length == 1 + ? connectionFactory.CreateConnection() + : connectionFactory.CreateConnection(hostnames); + }) + ).Value; + } + public void Dispose() { if (_isDisposed)