|
|
|
@ -23,15 +23,15 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen |
|
|
|
|
|
|
|
protected AbpAsyncTimer Timer { get; } |
|
|
|
|
|
|
|
protected ExchangeDeclareConfiguration Exchange { get; private set; } |
|
|
|
protected ExchangeDeclareConfiguration Exchange { get; private set; } = default!; |
|
|
|
|
|
|
|
protected QueueDeclareConfiguration Queue { get; private set; } |
|
|
|
protected QueueDeclareConfiguration Queue { get; private set; } = default!; |
|
|
|
|
|
|
|
protected string ConnectionName { get; private set; } |
|
|
|
protected string? ConnectionName { get; private set; } |
|
|
|
|
|
|
|
protected ConcurrentBag<Func<IModel, BasicDeliverEventArgs, Task>> Callbacks { get; } |
|
|
|
|
|
|
|
protected IModel Channel { get; private set; } |
|
|
|
protected IModel? Channel { get; private set; } |
|
|
|
|
|
|
|
protected ConcurrentQueue<QueueBindCommand> QueueBindCommands { get; } |
|
|
|
|
|
|
|
@ -58,7 +58,7 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen |
|
|
|
public void Initialize( |
|
|
|
[NotNull] ExchangeDeclareConfiguration exchange, |
|
|
|
[NotNull] QueueDeclareConfiguration queue, |
|
|
|
string connectionName = null) |
|
|
|
string? connectionName = null) |
|
|
|
{ |
|
|
|
Exchange = Check.NotNull(exchange, nameof(exchange)); |
|
|
|
Queue = Check.NotNull(queue, nameof(queue)); |
|
|
|
@ -192,16 +192,16 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen |
|
|
|
{ |
|
|
|
foreach (var callback in Callbacks) |
|
|
|
{ |
|
|
|
await callback(Channel, basicDeliverEventArgs); |
|
|
|
await callback(Channel!, basicDeliverEventArgs); |
|
|
|
} |
|
|
|
|
|
|
|
Channel.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false); |
|
|
|
Channel?.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
try |
|
|
|
{ |
|
|
|
Channel.BasicNack( |
|
|
|
Channel?.BasicNack( |
|
|
|
basicDeliverEventArgs.DeliveryTag, |
|
|
|
multiple: false, |
|
|
|
requeue: true |
|
|
|
|