Browse Source

Merge pull request #22119 from abpframework/rabbitmq-patch

Fix duplicate consumption of events when connection is broken
pull/22147/head
maliming 1 year ago
committed by GitHub
parent
commit
4f3dff00cc
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 2
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs
  2. 1
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs
  3. 12
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs
  4. 38
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs

2
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs

@ -122,7 +122,7 @@ public class JobQueue<TArgs> : IJobQueue<TArgs>
protected virtual Task EnsureInitializedAsync()
{
if (ChannelAccessor != null)
if (ChannelAccessor != null && ChannelAccessor.Channel.IsOpen)
{
return Task.CompletedTask;
}

1
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs

@ -20,6 +20,7 @@ public class AbpRabbitMqModule : AbpModule
foreach (var connectionFactory in options.Connections.Values)
{
connectionFactory.DispatchConsumersAsync = true;
connectionFactory.AutomaticRecoveryEnabled = false;
}
});
}

12
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs

@ -42,6 +42,18 @@ public class ChannelPool : IChannelPool, ISingletonDependency
poolItem.Acquire();
if (poolItem.Channel.IsClosed)
{
poolItem.Dispose();
Channels.TryRemove(channelName, out _);
poolItem = Channels.GetOrAdd(
channelName,
_ => new ChannelPoolItem(CreateChannel(channelName, connectionName))
);
poolItem.Acquire();
}
return new ChannelAccessor(
poolItem.Channel,
channelName,

38
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs

@ -24,21 +24,19 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency
public virtual IConnection Get(string? connectionName = null)
{
connectionName ??= RabbitMqConnections.DefaultConnectionName;
var connectionFactory = Options.Connections.GetOrDefault(connectionName);
try
{
var lazyConnection = Connections.GetOrAdd(
connectionName, () => new Lazy<IConnection>(() =>
{
var connection = Options.Connections.GetOrDefault(connectionName);
var hostnames = connection.HostName.TrimEnd(';').Split(';');
// Handle Rabbit MQ Cluster.
return hostnames.Length == 1 ? connection.CreateConnection() : connection.CreateConnection(hostnames);
})
);
return lazyConnection.Value;
var connection = GetConnection(connectionName, connectionFactory);
if (connection.IsOpen)
{
return connection;
}
connection.Dispose();
Connections.TryRemove(connectionName, out _);
return GetConnection(connectionName, connectionFactory);
}
catch (Exception)
{
@ -47,6 +45,20 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency
}
}
protected virtual IConnection GetConnection(string connectionName, ConnectionFactory connectionFactory)
{
return Connections.GetOrAdd(
connectionName, () => new Lazy<IConnection>(() =>
{
var hostnames = connectionFactory.HostName.TrimEnd(';').Split(';');
// Handle Rabbit MQ Cluster.
return hostnames.Length == 1
? connectionFactory.CreateConnection()
: connectionFactory.CreateConnection(hostnames);
})
).Value;
}
public void Dispose()
{
if (_isDisposed)

Loading…
Cancel
Save