Browse Source

Merge pull request #7071 from abpframework/liangshiwei/rabbitmq-async

Always use async consumer implementations
pull/7080/head
maliming 5 years ago
committed by GitHub
parent
commit
b4bfa11a4e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs
  2. 6
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqConnections.cs
  3. 23
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs

5
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs

@ -22,8 +22,7 @@ namespace Volo.Abp.RabbitMQ
public virtual IConnection Get(string connectionName = null)
{
connectionName = connectionName
?? RabbitMqConnections.DefaultConnectionName;
connectionName ??= RabbitMqConnections.DefaultConnectionName;
return Connections.GetOrAdd(
connectionName,
@ -58,4 +57,4 @@ namespace Volo.Abp.RabbitMQ
Connections.Clear();
}
}
}
}

6
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqConnections.cs

@ -9,7 +9,7 @@ namespace Volo.Abp.RabbitMQ
public class RabbitMqConnections : Dictionary<string, ConnectionFactory>
{
public const string DefaultConnectionName = "Default";
[NotNull]
public ConnectionFactory Default
{
@ -19,7 +19,7 @@ namespace Volo.Abp.RabbitMQ
public RabbitMqConnections()
{
Default = new ConnectionFactory();
Default = new ConnectionFactory() { DispatchConsumersAsync = true };
}
public ConnectionFactory GetOrDefault(string connectionName)
@ -32,4 +32,4 @@ namespace Volo.Abp.RabbitMQ
return Default;
}
}
}
}

23
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs

@ -143,10 +143,10 @@ namespace Volo.Abp.RabbitMQ
try
{
var channel = ConnectionPool
Channel = ConnectionPool
.Get(ConnectionName)
.CreateModel();
channel.ExchangeDeclare(
Channel.ExchangeDeclare(
exchange: Exchange.ExchangeName,
type: Exchange.Type,
durable: Exchange.Durable,
@ -154,7 +154,7 @@ namespace Volo.Abp.RabbitMQ
arguments: Exchange.Arguments
);
channel.QueueDeclare(
Channel.QueueDeclare(
queue: Queue.QueueName,
durable: Queue.Durable,
exclusive: Queue.Exclusive,
@ -162,19 +162,14 @@ namespace Volo.Abp.RabbitMQ
arguments: Queue.Arguments
);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, basicDeliverEventArgs) =>
{
await HandleIncomingMessageAsync(channel, basicDeliverEventArgs);
};
var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += HandleIncomingMessageAsync;
channel.BasicConsume(
Channel.BasicConsume(
queue: Queue.QueueName,
autoAck: false,
consumer: consumer
);
Channel = channel;
}
catch (Exception ex)
{
@ -183,16 +178,16 @@ namespace Volo.Abp.RabbitMQ
}
}
protected virtual async Task HandleIncomingMessageAsync(IModel channel, BasicDeliverEventArgs basicDeliverEventArgs)
protected virtual async Task HandleIncomingMessageAsync(object sender, BasicDeliverEventArgs basicDeliverEventArgs)
{
try
{
foreach (var callback in Callbacks)
{
await callback(channel, basicDeliverEventArgs);
await callback(Channel, basicDeliverEventArgs);
}
channel.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false);
Channel.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false);
}
catch (Exception ex)
{

Loading…
Cancel
Save