From 2d84e52fcdb2175c98d2561b36d7cd7b0996970e Mon Sep 17 00:00:00 2001 From: Nuno Vieira Date: Tue, 21 Jan 2025 12:33:34 +0000 Subject: [PATCH] Changes to make LocalDistributedEventBus able to use outbox/inbox patterns. --- .../Distributed/LocalDistributedEventBus.cs | 197 +++++++++--------- 1 file changed, 102 insertions(+), 95 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs index d653c63bb7..79fb1add78 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs @@ -1,35 +1,87 @@ using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Reflection; +using System.Text.Json; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Volo.Abp.Collections; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Local; +using System.Linq; +using Volo.Abp.Guids; +using Volo.Abp.MultiTenancy; +using Volo.Abp.Timing; +using Volo.Abp.Tracing; +using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Distributed; [Dependency(TryRegister = true)] [ExposeServices(typeof(IDistributedEventBus), typeof(LocalDistributedEventBus))] -public class LocalDistributedEventBus : IDistributedEventBus, ISingletonDependency +public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDependency { - private readonly ILocalEventBus _localEventBus; + protected ConcurrentDictionary> HandlerFactories { get; } - protected IServiceScopeFactory ServiceScopeFactory { get; } + protected ConcurrentDictionary EventTypes { get; } - protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } + public LocalDistributedEventBus(IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, Volo.Abp.Uow.IUnitOfWorkManager unitOfWorkManager, IOptions abpDistributedEventBusOptions, + IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, ILocalEventBus localEventBus, ICorrelationIdProvider correlationIdProvider) + : base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider) + { + HandlerFactories = new ConcurrentDictionary>(); + EventTypes = new ConcurrentDictionary(); + Subscribe(abpDistributedEventBusOptions.Value.Handlers); + } - public LocalDistributedEventBus( - ILocalEventBus localEventBus, - IServiceScopeFactory serviceScopeFactory, - IOptions distributedEventBusOptions) + protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) { - _localEventBus = localEventBus; - ServiceScopeFactory = serviceScopeFactory; - AbpDistributedEventBusOptions = distributedEventBusOptions.Value; - Subscribe(distributedEventBusOptions.Value.Handlers); + EventTypes.GetOrAdd(eventName, eventType); + return base.OnAddToOutboxAsync(eventName, eventType, eventData); } + + public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) + { + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + if (eventType == null) + { + return; + } + + var eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType); + + if (eventData == null) + { + return; + } + await LocalEventBus.PublishAsync(eventType, eventData); + + } + + public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) + { + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); + if (eventType == null) + return; + var eventData = JsonSerializer.Deserialize(outgoingEvent.EventData, eventType); + if (eventData == null) + { + return; + } + await LocalEventBus.PublishAsync(eventType, eventData); + } + + public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) + { + foreach (var outgoingEvent in outgoingEvents) + { + await PublishFromOutboxAsync(outgoingEvent, outboxConfig); + } + } + + public virtual void Subscribe(ITypeList handlers) { foreach (var handler in handlers) @@ -51,122 +103,77 @@ public class LocalDistributedEventBus : IDistributedEventBus, ISingletonDependen } } - /// - public virtual IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class - { - return Subscribe(typeof(TEvent), handler); - } - public IDisposable Subscribe(Func action) where TEvent : class + public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { - return _localEventBus.Subscribe(action); + return LocalEventBus.Subscribe(eventType, factory); } - public IDisposable Subscribe(ILocalEventHandler handler) where TEvent : class + public override void Unsubscribe(Func action) { - return _localEventBus.Subscribe(handler); + LocalEventBus.Unsubscribe(action); } - public IDisposable Subscribe() where TEvent : class where THandler : IEventHandler, new() + public override void Unsubscribe(Type eventType, IEventHandler handler) { - return _localEventBus.Subscribe(); + LocalEventBus.Unsubscribe(eventType, handler); } - public IDisposable Subscribe(Type eventType, IEventHandler handler) + public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) { - return _localEventBus.Subscribe(eventType, handler); + LocalEventBus.Unsubscribe(eventType, factory); } - public IDisposable Subscribe(IEventHandlerFactory factory) where TEvent : class + public override void UnsubscribeAll(Type eventType) { - return _localEventBus.Subscribe(factory); + LocalEventBus.UnsubscribeAll(eventType); } - public IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) { - return _localEventBus.Subscribe(eventType, factory); + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - public void Unsubscribe(Func action) where TEvent : class + protected override IEnumerable GetHandlerFactories(Type eventType) { - _localEventBus.Unsubscribe(action); - } + var handlerFactoryList = new List(); - public void Unsubscribe(ILocalEventHandler handler) where TEvent : class - { - _localEventBus.Unsubscribe(handler); - } - - public void Unsubscribe(Type eventType, IEventHandler handler) - { - _localEventBus.Unsubscribe(eventType, handler); - } - - public void Unsubscribe(IEventHandlerFactory factory) where TEvent : class - { - _localEventBus.Unsubscribe(factory); - } + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)) + ) + { + handlerFactoryList.Add( + new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } - public void Unsubscribe(Type eventType, IEventHandlerFactory factory) - { - _localEventBus.Unsubscribe(eventType, factory); + return handlerFactoryList.ToArray(); } - public void UnsubscribeAll() where TEvent : class + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { - _localEventBus.UnsubscribeAll(); - } + //Should trigger same type + if (handlerEventType == targetEventType) + { + return true; + } - public void UnsubscribeAll(Type eventType) - { - _localEventBus.UnsubscribeAll(eventType); - } + //Should trigger for inherited types + if (handlerEventType.IsAssignableFrom(targetEventType)) + { + return true; + } - public async Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) - where TEvent : class - { - await PublishDistributedEventSentReceivedAsync(typeof(TEvent), eventData, onUnitOfWorkComplete); - await _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete); + return false; } - public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) - { - await PublishDistributedEventSentReceivedAsync(eventType, eventData, onUnitOfWorkComplete); - await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); - } - public async Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class + protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - await PublishDistributedEventSentReceivedAsync(typeof(TEvent), eventData, onUnitOfWorkComplete); - await _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete); - } + await LocalEventBus.PublishAsync(eventType, eventData); - public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) - { - await PublishDistributedEventSentReceivedAsync(eventType, eventData, onUnitOfWorkComplete); - await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); } - private async Task PublishDistributedEventSentReceivedAsync(Type eventType, object eventData, bool onUnitOfWorkComplete) + protected override byte[] Serialize(object eventData) { - if (eventType != typeof(DistributedEventSent)) - { - await _localEventBus.PublishAsync(new DistributedEventSent - { - Source = DistributedEventSource.Direct, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData - }, onUnitOfWorkComplete); - } - - if (eventType != typeof(DistributedEventReceived)) - { - await _localEventBus.PublishAsync(new DistributedEventReceived - { - Source = DistributedEventSource.Direct, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData - }, onUnitOfWorkComplete); - } + return JsonSerializer.SerializeToUtf8Bytes(eventData); } -} \ No newline at end of file +}