using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Local; using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.RabbitMQ; using Volo.Abp.Threading; using Volo.Abp.Timing; using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.RabbitMq; /* TODO: How to handle unsubscribe to unbind on RabbitMq (may not be possible for) */ [Dependency(ReplaceServices = true)] [ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus), typeof(IRabbitMqDistributedEventBus))] public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDistributedEventBus, ISingletonDependency { protected AbpRabbitMqEventBusOptions AbpRabbitMqEventBusOptions { get; } protected IConnectionPool ConnectionPool { get; } protected IRabbitMqSerializer Serializer { get; } //TODO: Accessing to the List may not be thread-safe! protected ConcurrentDictionary> HandlerFactories { get; } protected ConcurrentDictionary EventTypes { get; } protected ConcurrentDictionary> AnonymousHandlerFactories { get; } protected IRabbitMqMessageConsumerFactory MessageConsumerFactory { get; } protected IRabbitMqMessageConsumer Consumer { get; private set; } = default!; private bool _exchangeCreated; public RabbitMqDistributedEventBus( IOptions options, IConnectionPool connectionPool, IRabbitMqSerializer serializer, IServiceScopeFactory serviceScopeFactory, IOptions distributedEventBusOptions, IRabbitMqMessageConsumerFactory messageConsumerFactory, ICurrentTenant currentTenant, IUnitOfWorkManager unitOfWorkManager, IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, ILocalEventBus localEventBus, ICorrelationIdProvider correlationIdProvider) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, distributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider) { ConnectionPool = connectionPool; Serializer = serializer; MessageConsumerFactory = messageConsumerFactory; AbpRabbitMqEventBusOptions = options.Value; HandlerFactories = new ConcurrentDictionary>(); EventTypes = new ConcurrentDictionary(); AnonymousHandlerFactories = new ConcurrentDictionary>(); } public virtual void Initialize() { Consumer = MessageConsumerFactory.Create( new ExchangeDeclareConfiguration( AbpRabbitMqEventBusOptions.ExchangeName, type: AbpRabbitMqEventBusOptions.GetExchangeTypeOrDefault(), durable: true, arguments: AbpRabbitMqEventBusOptions.ExchangeArguments ), new QueueDeclareConfiguration( AbpRabbitMqEventBusOptions.ClientName, durable: true, exclusive: false, autoDelete: false, prefetchCount: AbpRabbitMqEventBusOptions.PrefetchCount, arguments: AbpRabbitMqEventBusOptions.QueueArguments ), AbpRabbitMqEventBusOptions.ConnectionName ); Consumer.OnMessageReceived(ProcessEventAsync); SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); } private async Task ProcessEventAsync(IChannel channel, BasicDeliverEventArgs ea) { var eventName = ea.RoutingKey; var eventType = EventTypes.GetOrDefault(eventName); object eventData; if (eventType != null) { eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); } else if (AnonymousHandlerFactories.ContainsKey(eventName)) { eventType = typeof(AnonymousEventData); eventData = new AnonymousEventData(eventName, Serializer.Deserialize(ea.Body.ToArray())); } else { return; } var correlationId = ea.BasicProperties.CorrelationId; if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData, correlationId)) { return; } using (CorrelationIdProvider.Change(correlationId)) { await TriggerHandlersDirectAsync(eventType, eventData); } } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } handlerFactories.Add(factory); if (handlerFactories.Count == 1) //TODO: Multi-threading! { Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); } return new EventHandlerFactoryUnregistrar(this, eventType, factory); } /// public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } handlerFactories.Add(handler); if (handlerFactories.Count == 1) //TODO: Multi-threading! { Consumer.BindAsync(eventName); } return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } /// public override void Unsubscribe(Func action) { Check.NotNull(action, nameof(action)); GetOrCreateHandlerFactories(typeof(TEvent)) .Locking(factories => { factories.RemoveAll( factory => { var singleInstanceFactory = factory as SingleInstanceHandlerFactory; if (singleInstanceFactory == null) { return false; } var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler; if (actionHandler == null) { return false; } return actionHandler.Action == action; }); }); } /// public override void Unsubscribe(Type eventType, IEventHandler handler) { GetOrCreateHandlerFactories(eventType) .Locking(factories => { factories.RemoveAll( factory => factory is SingleInstanceHandlerFactory && (factory as SingleInstanceHandlerFactory)!.HandlerInstance == handler ); }); } /// public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) { GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory)); } /// public override void UnsubscribeAll(Type eventType) { GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { var eventType = EventTypes.GetOrDefault(eventName); var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); if (eventType != null) { return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); } if (AnonymousHandlerFactories.ContainsKey(eventName)) { return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); } throw new AbpException($"Unknown event name: {eventName}"); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { await PublishAsync(eventType, eventData, correlationId: CorrelationIdProvider.Get()); } protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) { unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } public async override Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId()); using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() { Source = DistributedEventSource.Outbox, EventName = outgoingEvent.EventName, EventData = outgoingEvent.EventData }); } } public async override Task PublishManyFromOutboxAsync( IEnumerable outgoingEvents, OutboxConfig outboxConfig) { using (var channel = await (await ConnectionPool.GetAsync(AbpRabbitMqEventBusOptions.ConnectionName)) .CreateChannelAsync(new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, new ThrottlingRateLimiter(256)))) { var outgoingEventArray = outgoingEvents.ToArray(); foreach (var outgoingEvent in outgoingEventArray) { await PublishAsync( channel, outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId()); using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() { Source = DistributedEventSource.Outbox, EventName = outgoingEvent.EventName, EventData = outgoingEvent.EventData }); } } } } public async override Task ProcessFromInboxAsync( IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); object eventData; if (eventType != null) { eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); } else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) { eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData)); eventType = typeof(AnonymousEventData); } else { return; } var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); } } protected override byte[] Serialize(object eventData) { return Serializer.Serialize(eventData); } public virtual Task PublishAsync( Type eventType, object eventData, Dictionary? headersArguments = null, Guid? eventId = null, string? correlationId = null) { var (eventName, resolvedData) = ResolveEventForPublishing(eventType, eventData); var body = Serializer.Serialize(resolvedData); return PublishAsync(eventName, body, headersArguments, eventId, correlationId); } protected virtual async Task PublishAsync( string eventName, byte[] body, Dictionary? headersArguments = null, Guid? eventId = null, string? correlationId = null) { using (var channel = await (await ConnectionPool.GetAsync(AbpRabbitMqEventBusOptions.ConnectionName)).CreateChannelAsync()) { await PublishAsync(channel, eventName, body, headersArguments, eventId, correlationId); } } protected virtual async Task PublishAsync( IChannel channel, string eventName, byte[] body, Dictionary? headersArguments = null, Guid? eventId = null, string? correlationId = null) { await EnsureExchangeExistsAsync(channel); var properties = new BasicProperties { DeliveryMode = DeliveryModes.Persistent }; if (properties.MessageId.IsNullOrEmpty()) { properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N"); } if (correlationId != null) { properties.CorrelationId = correlationId; } SetEventMessageHeaders(properties, headersArguments); await channel.BasicPublishAsync( exchange: AbpRabbitMqEventBusOptions.ExchangeName, routingKey: eventName, mandatory: false, basicProperties: properties, body: body ); } protected virtual async Task EnsureExchangeExistsAsync(IChannel channel) { if (_exchangeCreated) { return; } try { using (var temporaryChannel = await (await ConnectionPool.GetAsync(AbpRabbitMqEventBusOptions.ConnectionName)).CreateChannelAsync()) { await temporaryChannel.ExchangeDeclarePassiveAsync(AbpRabbitMqEventBusOptions.ExchangeName); } } catch (Exception) { await channel.ExchangeDeclareAsync( AbpRabbitMqEventBusOptions.ExchangeName, AbpRabbitMqEventBusOptions.GetExchangeTypeOrDefault(), durable: true ); } _exchangeCreated = true; } protected virtual void SetEventMessageHeaders(IBasicProperties properties, Dictionary? headersArguments) { if (headersArguments == null) { return; } properties.Headers ??= new Dictionary(); foreach (var header in headersArguments) { properties.Headers[header.Key] = header.Value; } } protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) { if (typeof(AnonymousEventData) != eventType) { EventTypes.GetOrAdd(eventName, eventType); } return base.OnAddToOutboxAsync(eventName, eventType, eventData); } private List GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd( eventType, type => { var eventName = EventNameAttribute.GetNameOrDefault(type); EventTypes.GetOrAdd(eventName, eventType); return new List(); } ); } protected override IEnumerable GetHandlerFactories(Type eventType) { var handlerFactoryList = new List(); var eventNames = EventTypes.Where(x => ShouldTriggerEventForHandler(eventType, x.Value)).Select(x => x.Key).ToList(); foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) { handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); } foreach (var handlerFactory in AnonymousHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key))) { handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value)); } return handlerFactoryList.ToArray(); } protected override Type? GetEventTypeByEventName(string eventName) { return EventTypes.GetOrDefault(eventName); } /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { GetOrCreateAnonymousHandlerFactories(eventName) .Locking(factories => { factories.RemoveAll( factory => factory is SingleInstanceHandlerFactory singleFactory && singleFactory.HandlerInstance == handler ); }); } /// public override void UnsubscribeAll(string eventName) { GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) { var result = new List(); var eventType = GetEventTypeByEventName(eventName); if (eventType != null) { return GetHandlerFactories(eventType); } foreach (var handlerFactory in AnonymousHandlerFactories.Where(hf => hf.Key == eventName)) { result.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value)); } return result; } private List GetOrCreateAnonymousHandlerFactories(string eventName) { return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type if (handlerEventType == targetEventType) { return true; } //TODO: Support inheritance? But it does not support on subscription to RabbitMq! //Should trigger for inherited types if (handlerEventType.IsAssignableFrom(targetEventType)) { return true; } return false; } }