|
|
|
@ -20,7 +20,7 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq |
|
|
|
[ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus))] |
|
|
|
public class RabbitMqDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency |
|
|
|
{ |
|
|
|
protected RabbitMqDistributedEventBusOptions RabbitMqDistributedEventBusOptions { get; } |
|
|
|
protected RabbitMqEventBusOptions RabbitMqEventBusOptions { get; } |
|
|
|
protected DistributedEventBusOptions DistributedEventBusOptions { get; } |
|
|
|
protected IConnectionPool ConnectionPool { get; } |
|
|
|
protected IRabbitMqSerializer Serializer { get; } |
|
|
|
@ -32,7 +32,7 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq |
|
|
|
protected IRabbitMqMessageConsumer Consumer { get; } |
|
|
|
|
|
|
|
public RabbitMqDistributedEventBus( |
|
|
|
IOptions<RabbitMqDistributedEventBusOptions> options, |
|
|
|
IOptions<RabbitMqEventBusOptions> options, |
|
|
|
IConnectionPool connectionPool, |
|
|
|
IRabbitMqSerializer serializer, |
|
|
|
IHybridServiceScopeFactory serviceScopeFactory, |
|
|
|
@ -44,24 +44,24 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq |
|
|
|
Serializer = serializer; |
|
|
|
MessageConsumerFactory = messageConsumerFactory; |
|
|
|
DistributedEventBusOptions = distributedEventBusOptions.Value; |
|
|
|
RabbitMqDistributedEventBusOptions = options.Value; |
|
|
|
RabbitMqEventBusOptions = options.Value; |
|
|
|
|
|
|
|
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>(); |
|
|
|
EventTypes = new ConcurrentDictionary<string, Type>(); |
|
|
|
|
|
|
|
Consumer = MessageConsumerFactory.Create( |
|
|
|
new ExchangeDeclareConfiguration( |
|
|
|
RabbitMqDistributedEventBusOptions.ExchangeName, |
|
|
|
RabbitMqEventBusOptions.ExchangeName, |
|
|
|
type: "direct", |
|
|
|
durable: true |
|
|
|
), |
|
|
|
new QueueDeclareConfiguration( |
|
|
|
RabbitMqDistributedEventBusOptions.ClientName, |
|
|
|
RabbitMqEventBusOptions.ClientName, |
|
|
|
durable: true, |
|
|
|
exclusive: false, |
|
|
|
autoDelete: false |
|
|
|
), |
|
|
|
RabbitMqDistributedEventBusOptions.ConnectionName |
|
|
|
RabbitMqEventBusOptions.ConnectionName |
|
|
|
); |
|
|
|
|
|
|
|
Consumer.OnMessageReceived(ProcessEventAsync); |
|
|
|
@ -96,8 +96,7 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq |
|
|
|
|
|
|
|
if (handlerFactories.Count == 1) //TODO: Multi-threading!
|
|
|
|
{ |
|
|
|
var eventName = EventNameAttribute.GetNameOrDefault(eventType); |
|
|
|
Consumer.BindAsync(eventName); |
|
|
|
Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); |
|
|
|
} |
|
|
|
|
|
|
|
return new EventHandlerFactoryUnregistrar(this, eventType, factory); |
|
|
|
@ -162,10 +161,10 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq |
|
|
|
var eventName = EventNameAttribute.GetNameOrDefault(eventType); |
|
|
|
var body = Serializer.Serialize(eventData); |
|
|
|
|
|
|
|
using (var channel = ConnectionPool.Get(RabbitMqDistributedEventBusOptions.ConnectionName).CreateModel()) |
|
|
|
using (var channel = ConnectionPool.Get(RabbitMqEventBusOptions.ConnectionName).CreateModel()) |
|
|
|
{ |
|
|
|
channel.ExchangeDeclare( |
|
|
|
RabbitMqDistributedEventBusOptions.ExchangeName, |
|
|
|
RabbitMqEventBusOptions.ExchangeName, |
|
|
|
"direct", |
|
|
|
durable: true |
|
|
|
); |
|
|
|
@ -174,7 +173,7 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq |
|
|
|
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; |
|
|
|
|
|
|
|
channel.BasicPublish( |
|
|
|
exchange: RabbitMqDistributedEventBusOptions.ExchangeName, |
|
|
|
exchange: RabbitMqEventBusOptions.ExchangeName, |
|
|
|
routingKey: eventName, |
|
|
|
mandatory: true, |
|
|
|
basicProperties: properties, |
|
|
|
|