From de10dcc2c4eeb72a062049155f62c9a2497c1b26 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Fri, 7 Feb 2025 17:39:42 +0800 Subject: [PATCH 1/3] Fix duplicate consumption of events when connection is broken --- .../Abp/BackgroundJobs/RabbitMQ/JobQueue.cs | 2 +- .../Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs | 1 + .../Volo/Abp/RabbitMQ/ConnectionPool.cs | 38 ++++++++++++------- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs index cf88778089..1dbe4a6dcc 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs @@ -122,7 +122,7 @@ public class JobQueue : IJobQueue protected virtual Task EnsureInitializedAsync() { - if (ChannelAccessor != null) + if (ChannelAccessor != null && ChannelAccessor.Channel.IsOpen) { return Task.CompletedTask; } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs index 60eecd0cd2..da010b83c9 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs @@ -20,6 +20,7 @@ public class AbpRabbitMqModule : AbpModule foreach (var connectionFactory in options.Connections.Values) { connectionFactory.DispatchConsumersAsync = true; + connectionFactory.AutomaticRecoveryEnabled = false; } }); } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs index 8c046ffffd..5f4556b96a 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs @@ -24,21 +24,19 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency public virtual IConnection Get(string? connectionName = null) { connectionName ??= RabbitMqConnections.DefaultConnectionName; - + var connectionFactory = Options.Connections.GetOrDefault(connectionName); try { - var lazyConnection = Connections.GetOrAdd( - connectionName, () => new Lazy(() => - { - var connection = Options.Connections.GetOrDefault(connectionName); - var hostnames = connection.HostName.TrimEnd(';').Split(';'); - // Handle Rabbit MQ Cluster. - return hostnames.Length == 1 ? connection.CreateConnection() : connection.CreateConnection(hostnames); - - }) - ); - - return lazyConnection.Value; + var connection = GetConnection(connectionName, connectionFactory); + + if (connection.IsOpen) + { + return connection; + } + + connection.Dispose(); + Connections.TryRemove(connectionName, out _); + return GetConnection(connectionName, connectionFactory); } catch (Exception) { @@ -47,6 +45,20 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency } } + protected virtual IConnection GetConnection(string connectionName, ConnectionFactory connectionFactory) + { + return Connections.GetOrAdd( + connectionName, () => new Lazy(() => + { + var hostnames = connectionFactory.HostName.TrimEnd(';').Split(';'); + // Handle Rabbit MQ Cluster. + return hostnames.Length == 1 + ? connectionFactory.CreateConnection() + : connectionFactory.CreateConnection(hostnames); + }) + ).Value; + } + public void Dispose() { if (_isDisposed) From d348dc84466fdd4be369ab0172c36251a383f0c8 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Tue, 11 Feb 2025 16:24:25 +0800 Subject: [PATCH 2/3] Check if channel is closed --- .../Volo/Abp/RabbitMQ/ChannelPool.cs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs index d795342c1d..85a11a0961 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs @@ -42,6 +42,17 @@ public class ChannelPool : IChannelPool, ISingletonDependency poolItem.Acquire(); + if (poolItem.Channel.IsClosed) + { + Channels.TryRemove(channelName, out _); + poolItem = Channels.GetOrAdd( + channelName, + _ => new ChannelPoolItem(CreateChannel(channelName, connectionName)) + ); + + poolItem.Acquire(); + } + return new ChannelAccessor( poolItem.Channel, channelName, From facb8dfb9a2767ad656363bd05bb7ce9d472a86e Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Tue, 11 Feb 2025 16:27:08 +0800 Subject: [PATCH 3/3] Update ChannelPool.cs --- framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs index 85a11a0961..b7ed4c80c8 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs @@ -44,6 +44,7 @@ public class ChannelPool : IChannelPool, ISingletonDependency if (poolItem.Channel.IsClosed) { + poolItem.Dispose(); Channels.TryRemove(channelName, out _); poolItem = Channels.GetOrAdd( channelName,