|
|
|
@ -1,11 +1,32 @@ |
|
|
|
using System; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using Microsoft.Extensions.Options; |
|
|
|
using Volo.Abp.DependencyInjection; |
|
|
|
using Volo.Abp.RabbitMQ; |
|
|
|
using RabbitMQ.Client; |
|
|
|
|
|
|
|
namespace Volo.Abp.EventBus.Distributed.RabbitMq |
|
|
|
{ |
|
|
|
/* Inspired from the implementation of "eShopOnContainers" |
|
|
|
* TODO: Implement Retry system |
|
|
|
* TODO: Should be improved |
|
|
|
*/ |
|
|
|
public class RabbitMqDistributedEventBus : IDistributedEventBus, ITransientDependency |
|
|
|
{ |
|
|
|
protected RabbitMqDistributedEventBusOptions Options { get; } |
|
|
|
protected IChannelPool ChannelPool { get; } |
|
|
|
protected IRabbitMqSerializer Serializer { get; } |
|
|
|
|
|
|
|
public RabbitMqDistributedEventBus( |
|
|
|
IOptions<RabbitMqDistributedEventBusOptions> options, |
|
|
|
IChannelPool channelPool, |
|
|
|
IRabbitMqSerializer serializer) |
|
|
|
{ |
|
|
|
ChannelPool = channelPool; |
|
|
|
Serializer = serializer; |
|
|
|
Options = options.Value; |
|
|
|
} |
|
|
|
|
|
|
|
public IDisposable Subscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class |
|
|
|
{ |
|
|
|
throw new NotImplementedException(); |
|
|
|
@ -39,12 +60,32 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq |
|
|
|
public Task PublishAsync<TEvent>(TEvent eventData) |
|
|
|
where TEvent : class |
|
|
|
{ |
|
|
|
throw new NotImplementedException(); |
|
|
|
return PublishAsync(typeof(TEvent), eventData); |
|
|
|
} |
|
|
|
|
|
|
|
public Task PublishAsync(Type eventType, object eventData) |
|
|
|
{ |
|
|
|
throw new NotImplementedException(); |
|
|
|
var eventName = eventType.FullName; //TODO: Get eventname from an attribute if available
|
|
|
|
var body = Serializer.Serialize(eventData); |
|
|
|
|
|
|
|
using (var channelAccessor = ChannelPool.Acquire(Guid.NewGuid().ToString())) |
|
|
|
{ |
|
|
|
//TODO: Other properties like durable?
|
|
|
|
channelAccessor.Channel.ExchangeDeclare(Options.ExchangeName, ""); |
|
|
|
|
|
|
|
var properties = channelAccessor.Channel.CreateBasicProperties(); |
|
|
|
properties.DeliveryMode = 2; //persistent
|
|
|
|
|
|
|
|
channelAccessor.Channel.BasicPublish( |
|
|
|
exchange: Options.ExchangeName, |
|
|
|
routingKey: eventName, |
|
|
|
mandatory: true, |
|
|
|
basicProperties: properties, |
|
|
|
body: body |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
return Task.CompletedTask; |
|
|
|
} |
|
|
|
} |
|
|
|
} |