diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs index 08d9d48036..5856f7bb16 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs @@ -22,8 +22,7 @@ namespace Volo.Abp.RabbitMQ public virtual IConnection Get(string connectionName = null) { - connectionName = connectionName - ?? RabbitMqConnections.DefaultConnectionName; + connectionName ??= RabbitMqConnections.DefaultConnectionName; return Connections.GetOrAdd( connectionName, @@ -58,4 +57,4 @@ namespace Volo.Abp.RabbitMQ Connections.Clear(); } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqConnections.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqConnections.cs index a30db4c96a..f2b5168d82 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqConnections.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqConnections.cs @@ -9,7 +9,7 @@ namespace Volo.Abp.RabbitMQ public class RabbitMqConnections : Dictionary { public const string DefaultConnectionName = "Default"; - + [NotNull] public ConnectionFactory Default { @@ -19,7 +19,7 @@ namespace Volo.Abp.RabbitMQ public RabbitMqConnections() { - Default = new ConnectionFactory(); + Default = new ConnectionFactory() { DispatchConsumersAsync = true }; } public ConnectionFactory GetOrDefault(string connectionName) @@ -32,4 +32,4 @@ namespace Volo.Abp.RabbitMQ return Default; } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs index fb1b0c7fe2..b94663d886 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs @@ -143,10 +143,10 @@ namespace Volo.Abp.RabbitMQ try { - var channel = ConnectionPool + Channel = ConnectionPool .Get(ConnectionName) .CreateModel(); - channel.ExchangeDeclare( + Channel.ExchangeDeclare( exchange: Exchange.ExchangeName, type: Exchange.Type, durable: Exchange.Durable, @@ -154,7 +154,7 @@ namespace Volo.Abp.RabbitMQ arguments: Exchange.Arguments ); - channel.QueueDeclare( + Channel.QueueDeclare( queue: Queue.QueueName, durable: Queue.Durable, exclusive: Queue.Exclusive, @@ -162,19 +162,14 @@ namespace Volo.Abp.RabbitMQ arguments: Queue.Arguments ); - var consumer = new EventingBasicConsumer(channel); - consumer.Received += async (model, basicDeliverEventArgs) => - { - await HandleIncomingMessageAsync(channel, basicDeliverEventArgs); - }; + var consumer = new AsyncEventingBasicConsumer(Channel); + consumer.Received += HandleIncomingMessageAsync; - channel.BasicConsume( + Channel.BasicConsume( queue: Queue.QueueName, autoAck: false, consumer: consumer ); - - Channel = channel; } catch (Exception ex) { @@ -183,16 +178,16 @@ namespace Volo.Abp.RabbitMQ } } - protected virtual async Task HandleIncomingMessageAsync(IModel channel, BasicDeliverEventArgs basicDeliverEventArgs) + protected virtual async Task HandleIncomingMessageAsync(object sender, BasicDeliverEventArgs basicDeliverEventArgs) { try { foreach (var callback in Callbacks) { - await callback(channel, basicDeliverEventArgs); + await callback(Channel, basicDeliverEventArgs); } - channel.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false); + Channel.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false); } catch (Exception ex) {