Browse Source

Update `RabbitMQ.Client` to 7.X.

pull/22510/head
maliming 10 months ago
parent
commit
f1e6770e3c
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 2
      Directory.Packages.props
  2. 63
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs
  3. 7
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueConfiguration.cs
  4. 6
      framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/AbpRabbitMqEventBusOptions.cs
  5. 45
      framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs
  6. 1
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs
  7. 61
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs
  8. 78
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs
  9. 6
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs
  10. 2
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IChannelAccessor.cs
  11. 3
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IChannelPool.cs
  12. 3
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs
  13. 2
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IRabbitMqMessageConsumer.cs
  14. 15
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs
  15. 55
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs

2
Directory.Packages.props

@ -142,7 +142,7 @@
<PackageVersion Include="Quartz.Extensions.DependencyInjection" Version="3.13.0" />
<PackageVersion Include="Quartz.Plugins.TimeZoneConverter" Version="3.13.0" />
<PackageVersion Include="Quartz.Serialization.Json" Version="3.13.0" />
<PackageVersion Include="RabbitMQ.Client" Version="6.8.1" />
<PackageVersion Include="RabbitMQ.Client" Version="7.1.2" />
<PackageVersion Include="Rebus" Version="8.6.0" />
<PackageVersion Include="Rebus.ServiceProvider" Version="10.2.0" />
<PackageVersion Include="Scriban" Version="5.10.0" />

63
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
@ -120,46 +121,44 @@ public class JobQueue<TArgs> : IJobQueue<TArgs>
ChannelAccessor?.Dispose();
}
protected virtual Task EnsureInitializedAsync()
protected virtual async Task EnsureInitializedAsync()
{
if (ChannelAccessor != null && ChannelAccessor.Channel.IsOpen)
{
return Task.CompletedTask;
return;
}
ChannelAccessor = ChannelPool.Acquire(
ChannelAccessor = await ChannelPool.AcquireAsync(
ChannelPrefix + QueueConfiguration.QueueName,
QueueConfiguration.ConnectionName
);
var result = QueueConfiguration.Declare(ChannelAccessor.Channel);
var result = await QueueConfiguration.DeclareAsync(ChannelAccessor.Channel);
Logger.LogDebug($"RabbitMQ Queue '{QueueConfiguration.QueueName}' has {result.MessageCount} messages and {result.ConsumerCount} consumers.");
// Declare delayed queue
QueueConfiguration.DeclareDelayed(ChannelAccessor.Channel);
await QueueConfiguration.DeclareDelayedAsync(ChannelAccessor.Channel);
if (AbpBackgroundJobOptions.IsJobExecutionEnabled)
{
if (QueueConfiguration.PrefetchCount.HasValue)
{
ChannelAccessor.Channel.BasicQos(0, QueueConfiguration.PrefetchCount.Value, false);
await ChannelAccessor.Channel.BasicQosAsync(0, QueueConfiguration.PrefetchCount.Value, false);
}
Consumer = new AsyncEventingBasicConsumer(ChannelAccessor.Channel);
Consumer.Received += MessageReceived;
Consumer.ReceivedAsync += MessageReceived;
//TODO: What BasicConsume returns?
ChannelAccessor.Channel.BasicConsume(
await ChannelAccessor.Channel.BasicConsumeAsync(
queue: QueueConfiguration.QueueName,
autoAck: false,
consumer: Consumer
);
}
return Task.CompletedTask;
}
protected virtual Task PublishAsync(
protected virtual async Task PublishAsync(
TArgs args,
BackgroundJobPriority priority = BackgroundJobPriority.Normal,
TimeSpan? delay = null)
@ -167,29 +166,27 @@ public class JobQueue<TArgs> : IJobQueue<TArgs>
//TODO: How to handle priority
var routingKey = QueueConfiguration.QueueName;
var basicProperties = CreateBasicPropertiesToPublish();
var basicProperties = new BasicProperties
{
Persistent = true
};
if (delay.HasValue)
{
routingKey = QueueConfiguration.DelayedQueueName;
basicProperties.Expiration = delay.Value.TotalMilliseconds.ToString();
basicProperties.Expiration = delay.Value.TotalMilliseconds.ToString(CultureInfo.InvariantCulture);
}
ChannelAccessor!.Channel.BasicPublish(
exchange: "",
routingKey: routingKey,
basicProperties: basicProperties,
body: Serializer.Serialize(args!)
);
return Task.CompletedTask;
}
protected virtual IBasicProperties CreateBasicPropertiesToPublish()
{
var properties = ChannelAccessor!.Channel.CreateBasicProperties();
properties.Persistent = true;
return properties;
if (ChannelAccessor != null)
{
await ChannelAccessor.Channel.BasicPublishAsync(
exchange: "",
routingKey: routingKey,
mandatory: false,
basicProperties: basicProperties,
body: Serializer.Serialize(args!)
);
}
}
protected virtual async Task MessageReceived(object sender, BasicDeliverEventArgs ea)
@ -205,17 +202,17 @@ public class JobQueue<TArgs> : IJobQueue<TArgs>
try
{
await JobExecuter.ExecuteAsync(context);
ChannelAccessor!.Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
await ChannelAccessor!.Channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (BackgroundJobExecutionException)
{
//TODO: Reject like that?
ChannelAccessor!.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
await ChannelAccessor!.Channel.BasicRejectAsync(deliveryTag: ea.DeliveryTag, requeue: true);
}
catch (Exception)
{
//TODO: Reject like that?
ChannelAccessor!.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
await ChannelAccessor!.Channel.BasicRejectAsync(deliveryTag: ea.DeliveryTag, requeue: false);
}
}
}

7
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueConfiguration.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Volo.Abp.RabbitMQ;
@ -34,15 +35,15 @@ public class JobQueueConfiguration : QueueDeclareConfiguration
DelayedQueueName = delayedQueueName;
}
public virtual QueueDeclareOk DeclareDelayed(IModel channel)
public virtual async Task<QueueDeclareOk> DeclareDelayedAsync(IChannel channel)
{
var delayedArguments = new Dictionary<string, object>(Arguments)
var delayedArguments = new Dictionary<string, object?>(Arguments)
{
["x-dead-letter-routing-key"] = QueueName,
["x-dead-letter-exchange"] = string.Empty
};
return channel.QueueDeclare(
return await channel.QueueDeclareAsync(
queue: DelayedQueueName,
durable: Durable,
exclusive: Exclusive,

6
framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/AbpRabbitMqEventBusOptions.cs

@ -17,10 +17,10 @@ public class AbpRabbitMqEventBusOptions
public ushort? PrefetchCount { get; set; }
public IDictionary<string, object> QueueArguments { get; set; } = new Dictionary<string, object>();
public IDictionary<string, object?> QueueArguments { get; set; } = new Dictionary<string, object?>();
public IDictionary<string, object?> ExchangeArguments { get; set; } = new Dictionary<string, object?>();
public IDictionary<string, object> ExchangeArguments { get; set; } = new Dictionary<string, object>();
public string GetExchangeTypeOrDefault()
{
return string.IsNullOrEmpty(ExchangeType)

45
framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs

@ -97,7 +97,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}
private async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea)
private async Task ProcessEventAsync(IChannel channel, BasicDeliverEventArgs ea)
{
var eventName = ea.RoutingKey;
var eventType = EventTypes.GetOrDefault(eventName);
@ -224,10 +224,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
IEnumerable<OutgoingEventInfo> outgoingEvents,
OutboxConfig outboxConfig)
{
using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
using (var channel = await (await ConnectionPool.GetAsync(AbpRabbitMqEventBusOptions.ConnectionName))
.CreateChannelAsync(new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true)))
{
var outgoingEventArray = outgoingEvents.ToArray();
channel.ConfirmSelect();
foreach (var outgoingEvent in outgoingEventArray)
{
@ -248,8 +248,6 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
});
}
}
channel.WaitForConfirmsOrDie();
}
}
@ -293,31 +291,34 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
return PublishAsync(eventName, body, headersArguments, eventId, correlationId);
}
protected virtual Task PublishAsync(
protected virtual async Task PublishAsync(
string eventName,
byte[] body,
Dictionary<string, object>? headersArguments = null,
Guid? eventId = null,
string? correlationId = null)
{
using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
using (var channel = await (await ConnectionPool.GetAsync(AbpRabbitMqEventBusOptions.ConnectionName))
.CreateChannelAsync(new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true)))
{
return PublishAsync(channel, eventName, body, headersArguments, eventId, correlationId);
await PublishAsync(channel, eventName, body, headersArguments, eventId, correlationId);
}
}
protected virtual Task PublishAsync(
IModel channel,
protected virtual async Task PublishAsync(
IChannel channel,
string eventName,
byte[] body,
Dictionary<string, object>? headersArguments = null,
Guid? eventId = null,
string? correlationId = null)
{
EnsureExchangeExists(channel);
await EnsureExchangeExistsAsync(channel);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
var properties = new BasicProperties
{
DeliveryMode = DeliveryModes.Persistent
};
if (properties.MessageId.IsNullOrEmpty())
{
@ -331,18 +332,16 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
SetEventMessageHeaders(properties, headersArguments);
channel.BasicPublish(
await channel.BasicPublishAsync(
exchange: AbpRabbitMqEventBusOptions.ExchangeName,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: body
);
return Task.CompletedTask;
}
private void EnsureExchangeExists(IModel channel)
protected virtual async Task EnsureExchangeExistsAsync(IChannel channel)
{
if (_exchangeCreated)
{
@ -351,14 +350,16 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
try
{
using (var temporaryChannel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
using (var temporaryChannel = await(await ConnectionPool.GetAsync(AbpRabbitMqEventBusOptions.ConnectionName))
.CreateChannelAsync(new CreateChannelOptions(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true)))
{
temporaryChannel.ExchangeDeclarePassive(AbpRabbitMqEventBusOptions.ExchangeName);
await temporaryChannel.ExchangeDeclarePassiveAsync(AbpRabbitMqEventBusOptions.ExchangeName);
}
}
catch (Exception)
{
channel.ExchangeDeclare(
await channel.ExchangeDeclareAsync(
AbpRabbitMqEventBusOptions.ExchangeName,
AbpRabbitMqEventBusOptions.GetExchangeTypeOrDefault(),
durable: true
@ -367,14 +368,14 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
_exchangeCreated = true;
}
private void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object>? headersArguments)
protected virtual void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object>? headersArguments)
{
if (headersArguments == null)
{
return;
}
properties.Headers ??= new Dictionary<string, object>();
properties.Headers ??= new Dictionary<string, object?>();
foreach (var header in headersArguments)
{

1
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs

@ -19,7 +19,6 @@ public class AbpRabbitMqModule : AbpModule
{
foreach (var connectionFactory in options.Connections.Values)
{
connectionFactory.DispatchConsumersAsync = true;
connectionFactory.AutomaticRecoveryEnabled = false;
}
});

61
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ChannelPool.cs

@ -3,10 +3,12 @@ using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using RabbitMQ.Client;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
namespace Volo.Abp.RabbitMQ;
@ -16,6 +18,8 @@ public class ChannelPool : IChannelPool, ISingletonDependency
protected ConcurrentDictionary<string, ChannelPoolItem> Channels { get; }
protected SemaphoreSlim Semaphore = new SemaphoreSlim(1, 1);
protected bool IsDisposed { get; private set; }
protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10);
@ -29,16 +33,33 @@ public class ChannelPool : IChannelPool, ISingletonDependency
Logger = NullLogger<ChannelPool>.Instance;
}
public virtual IChannelAccessor Acquire(string? channelName = null, string? connectionName = null)
public virtual async Task<IChannelAccessor> AcquireAsync(string? channelName = null, string? connectionName = null)
{
CheckDisposed();
channelName = channelName ?? "";
var poolItem = Channels.GetOrAdd(
channelName,
_ => new ChannelPoolItem(CreateChannel(channelName, connectionName))
);
ChannelPoolItem poolItem;
if (Channels.TryGetValue(channelName, out var existingChannelPoolItem))
{
poolItem = existingChannelPoolItem;
}
else
{
using (await Semaphore.LockAsync())
{
if (!Channels.TryGetValue(channelName, out var channel))
{
poolItem = new ChannelPoolItem(await CreateChannelAsync(channelName, connectionName));
Channels.TryAdd(channelName, poolItem);
}
else
{
poolItem = channel;
}
}
}
poolItem.Acquire();
@ -46,11 +67,13 @@ public class ChannelPool : IChannelPool, ISingletonDependency
{
poolItem.Dispose();
Channels.TryRemove(channelName, out _);
poolItem = Channels.GetOrAdd(
channelName,
_ => new ChannelPoolItem(CreateChannel(channelName, connectionName))
);
using (await Semaphore.LockAsync())
{
poolItem = new ChannelPoolItem(await CreateChannelAsync(channelName, connectionName));
Channels.TryAdd(channelName, poolItem);
}
poolItem.Acquire();
}
@ -61,14 +84,14 @@ public class ChannelPool : IChannelPool, ISingletonDependency
);
}
protected virtual IModel CreateChannel(string channelName, string? connectionName)
protected virtual async Task<IChannel> CreateChannelAsync(string channelName, string? connectionName)
{
return ConnectionPool
.Get(connectionName)
.CreateModel();
return await (await ConnectionPool
.GetAsync(connectionName))
.CreateChannelAsync();
}
protected void CheckDisposed()
protected virtual void CheckDisposed()
{
if (IsDisposed)
{
@ -130,7 +153,7 @@ public class ChannelPool : IChannelPool, ISingletonDependency
protected class ChannelPoolItem : IDisposable
{
public IModel Channel { get; }
public IChannel Channel { get; }
public bool IsInUse {
get => _isInUse;
@ -138,7 +161,7 @@ public class ChannelPool : IChannelPool, ISingletonDependency
}
private volatile bool _isInUse;
public ChannelPoolItem(IModel channel)
public ChannelPoolItem(IChannel channel)
{
Channel = channel;
}
@ -186,13 +209,13 @@ public class ChannelPool : IChannelPool, ISingletonDependency
protected class ChannelAccessor : IChannelAccessor
{
public IModel Channel { get; }
public IChannel Channel { get; }
public string Name { get; }
private readonly Action _disposeAction;
public ChannelAccessor(IModel channel, string name, Action disposeAction)
public ChannelAccessor(IChannel channel, string name, Action disposeAction)
{
_disposeAction = disposeAction;
Name = name;

78
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs

@ -1,9 +1,11 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
namespace Volo.Abp.RabbitMQ;
@ -11,52 +13,64 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency
{
protected AbpRabbitMqOptions Options { get; }
protected ConcurrentDictionary<string, Lazy<IConnection>> Connections { get; }
protected ConcurrentDictionary<string, IConnection> Connections { get; }
protected SemaphoreSlim Semaphore = new SemaphoreSlim(1, 1);
private bool _isDisposed;
public ConnectionPool(IOptions<AbpRabbitMqOptions> options)
{
Options = options.Value;
Connections = new ConcurrentDictionary<string, Lazy<IConnection>>();
Connections = new ConcurrentDictionary<string, IConnection>();
}
public virtual IConnection Get(string? connectionName = null)
public virtual async Task<IConnection> GetAsync(string? connectionName = null)
{
connectionName ??= RabbitMqConnections.DefaultConnectionName;
var connectionFactory = Options.Connections.GetOrDefault(connectionName);
try
IConnection connection;
if (Connections.TryGetValue(connectionName, out var existingConnection))
{
var connection = GetConnection(connectionName, connectionFactory);
if (connection.IsOpen)
{
return connection;
}
connection.Dispose();
Connections.TryRemove(connectionName, out _);
return GetConnection(connectionName, connectionFactory);
connection = existingConnection;
}
catch (Exception)
else
{
Connections.TryRemove(connectionName, out _);
throw;
using (await Semaphore.LockAsync())
{
var connectionFactory = Options.Connections.GetOrDefault(connectionName);
try
{
connection = await GetConnectionAsync(connectionName, connectionFactory);
Connections.TryAdd(connectionName, connection);
if (!connection.IsOpen)
{
connection.Dispose();
Connections.TryRemove(connectionName, out _);
connection = await GetConnectionAsync(connectionName, connectionFactory);
Connections.TryAdd(connectionName, connection);
}
}
catch (Exception)
{
Connections.TryRemove(connectionName, out _);
throw;
}
}
}
return connection;
}
protected virtual IConnection GetConnection(string connectionName, ConnectionFactory connectionFactory)
protected virtual async Task<IConnection> GetConnectionAsync(string connectionName, ConnectionFactory connectionFactory)
{
return Connections.GetOrAdd(
connectionName, () => new Lazy<IConnection>(() =>
{
var hostnames = connectionFactory.HostName.TrimEnd(';').Split(';');
// Handle Rabbit MQ Cluster.
return hostnames.Length == 1
? connectionFactory.CreateConnection()
: connectionFactory.CreateConnection(hostnames);
})
).Value;
var hostnames = connectionFactory.HostName.TrimEnd(';').Split(';');
// Handle Rabbit MQ Cluster.
return hostnames.Length == 1
? await connectionFactory.CreateConnectionAsync()
: await connectionFactory.CreateConnectionAsync(hostnames);
}
public void Dispose()
@ -72,11 +86,11 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency
{
try
{
connection.Value.Dispose();
connection.Dispose();
}
catch
{
// ignored
}
}

6
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ExchangeDeclareConfiguration.cs

@ -12,19 +12,19 @@ public class ExchangeDeclareConfiguration
public bool AutoDelete { get; set; }
public IDictionary<string, object> Arguments { get; }
public IDictionary<string, object?> Arguments { get; }
public ExchangeDeclareConfiguration(
string exchangeName,
string type,
bool durable = false,
bool autoDelete = false,
IDictionary<string, object>? arguments = null)
IDictionary<string, object?>? arguments = null)
{
ExchangeName = exchangeName;
Type = type;
Durable = durable;
AutoDelete = autoDelete;
Arguments = arguments?? new Dictionary<string, object>();
Arguments = arguments?? new Dictionary<string, object?>();
}
}

2
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IChannelAccessor.cs

@ -10,7 +10,7 @@ public interface IChannelAccessor : IDisposable
/// Never dispose the <see cref="Channel"/> object.
/// Instead, dispose the <see cref="IChannelAccessor"/> after usage.
/// </summary>
IModel Channel { get; }
IChannel Channel { get; }
/// <summary>
/// Name of the channel.

3
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IChannelPool.cs

@ -1,8 +1,9 @@
using System;
using System.Threading.Tasks;
namespace Volo.Abp.RabbitMQ;
public interface IChannelPool : IDisposable
{
IChannelAccessor Acquire(string? channelName = null, string? connectionName = null);
Task<IChannelAccessor> AcquireAsync(string? channelName = null, string? connectionName = null);
}

3
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs

@ -1,9 +1,10 @@
using System;
using System.Threading.Tasks;
using RabbitMQ.Client;
namespace Volo.Abp.RabbitMQ;
public interface IConnectionPool : IDisposable
{
IConnection Get(string? connectionName = null);
Task<IConnection> GetAsync(string? connectionName = null);
}

2
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IRabbitMqMessageConsumer.cs

@ -11,5 +11,5 @@ public interface IRabbitMqMessageConsumer
Task UnbindAsync(string routingKey);
void OnMessageReceived(Func<IModel, BasicDeliverEventArgs, Task> callback);
void OnMessageReceived(Func<IChannel, BasicDeliverEventArgs, Task> callback);
}

15
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDeclareConfiguration.cs

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using JetBrains.Annotations;
using RabbitMQ.Client;
@ -13,10 +14,10 @@ public class QueueDeclareConfiguration
public bool Exclusive { get; set; }
public bool AutoDelete { get; set; }
public ushort? PrefetchCount { get; set; }
public IDictionary<string, object> Arguments { get; }
public ushort? PrefetchCount { get; set; }
public IDictionary<string, object?> Arguments { get; }
public QueueDeclareConfiguration(
[NotNull] string queueName,
@ -24,19 +25,19 @@ public class QueueDeclareConfiguration
bool exclusive = false,
bool autoDelete = false,
ushort? prefetchCount = null,
IDictionary<string, object>? arguments = null)
IDictionary<string, object?>? arguments = null)
{
QueueName = queueName;
Durable = durable;
Exclusive = exclusive;
AutoDelete = autoDelete;
Arguments = arguments?? new Dictionary<string, object>();
Arguments = arguments?? new Dictionary<string, object?>();
PrefetchCount = prefetchCount;
}
public virtual QueueDeclareOk Declare(IModel channel)
public virtual async Task<QueueDeclareOk> DeclareAsync(IChannel channel)
{
return channel.QueueDeclare(
return await channel.QueueDeclareAsync(
queue: QueueName,
durable: Durable,
exclusive: Exclusive,

55
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs

@ -5,6 +5,7 @@ using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.ExceptionHandling;
@ -28,13 +29,13 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen
protected string? ConnectionName { get; private set; }
protected ConcurrentBag<Func<IModel, BasicDeliverEventArgs, Task>> Callbacks { get; }
protected ConcurrentBag<Func<IChannel, BasicDeliverEventArgs, Task>> Callbacks { get; }
protected IModel? Channel { get; private set; }
protected IChannel? Channel { get; private set; }
protected ConcurrentQueue<QueueBindCommand> QueueBindCommands { get; }
protected object ChannelSendSyncLock { get; } = new object();
protected SemaphoreSlim Semaphore = new SemaphoreSlim(1, 1);
public RabbitMqMessageConsumer(
IConnectionPool connectionPool,
@ -47,7 +48,7 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen
Logger = NullLogger<RabbitMqMessageConsumer>.Instance;
QueueBindCommands = new ConcurrentQueue<QueueBindCommand>();
Callbacks = new ConcurrentBag<Func<IModel, BasicDeliverEventArgs, Task>>();
Callbacks = new ConcurrentBag<Func<IChannel, BasicDeliverEventArgs, Task>>();
Timer.Period = 5000; //5 sec.
Timer.Elapsed = Timer_Elapsed;
@ -88,21 +89,21 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen
return;
}
lock (ChannelSendSyncLock)
using (await Semaphore.LockAsync())
{
if (QueueBindCommands.TryPeek(out var command))
{
switch (command.Type)
{
case QueueBindType.Bind:
Channel.QueueBind(
await Channel.QueueBindAsync(
queue: Queue.QueueName,
exchange: Exchange.ExchangeName,
routingKey: command.RoutingKey
);
break;
case QueueBindType.Unbind:
Channel.QueueUnbind(
await Channel.QueueUnbindAsync(
queue: Queue.QueueName,
exchange: Exchange.ExchangeName,
routingKey: command.RoutingKey
@ -124,7 +125,7 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen
}
}
public virtual void OnMessageReceived(Func<IModel, BasicDeliverEventArgs, Task> callback)
public virtual void OnMessageReceived(Func<IChannel, BasicDeliverEventArgs, Task> callback)
{
Callbacks.Add(callback);
}
@ -144,11 +145,11 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen
try
{
Channel = ConnectionPool
.Get(ConnectionName)
.CreateModel();
Channel = await (await ConnectionPool
.GetAsync(ConnectionName))
.CreateChannelAsync();
Channel.ExchangeDeclare(
await Channel.ExchangeDeclareAsync(
exchange: Exchange.ExchangeName,
type: Exchange.Type,
durable: Exchange.Durable,
@ -156,7 +157,7 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen
arguments: Exchange.Arguments
);
Channel.QueueDeclare(
await Channel.QueueDeclareAsync(
queue: Queue.QueueName,
durable: Queue.Durable,
exclusive: Queue.Exclusive,
@ -166,13 +167,13 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen
if (Queue.PrefetchCount.HasValue)
{
Channel.BasicQos(0, Queue.PrefetchCount.Value, false);
await Channel.BasicQosAsync(0, Queue.PrefetchCount.Value, false);
}
var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += HandleIncomingMessageAsync;
Channel.BasicConsume(
consumer.ReceivedAsync += HandleIncomingMessageAsync;
await Channel.BasicConsumeAsync(
queue: Queue.QueueName,
autoAck: false,
consumer: consumer
@ -194,17 +195,23 @@ public class RabbitMqMessageConsumer : IRabbitMqMessageConsumer, ITransientDepen
await callback(Channel!, basicDeliverEventArgs);
}
Channel?.BasicAck(basicDeliverEventArgs.DeliveryTag, multiple: false);
if (Channel != null)
{
await Channel.BasicAckAsync(basicDeliverEventArgs.DeliveryTag, multiple: false);
}
}
catch (Exception ex)
{
try
{
Channel?.BasicNack(
basicDeliverEventArgs.DeliveryTag,
multiple: false,
requeue: true
);
if (Channel != null)
{
await Channel.BasicNackAsync(
basicDeliverEventArgs.DeliveryTag,
multiple: false,
requeue: true
);
}
}
// ReSharper disable once EmptyGeneralCatchClause
catch { }

Loading…
Cancel
Save