From fbe3fe10bfa534eff15aadab7ea8e4926bb1e5a0 Mon Sep 17 00:00:00 2001 From: maliming Date: Wed, 29 Jan 2025 13:51:58 +0800 Subject: [PATCH] Refactor `LocalDistributedEventBus`. --- .../EventTypeWithEventHandlerFactories.cs | 17 ++ .../Volo/Abp/EventBus/Local/ILocalEventBus.cs | 10 +- .../Distributed/DistributedEventBusBase.cs | 6 +- .../Distributed/LocalDistributedEventBus.cs | 181 ++++++++++-------- .../Volo/Abp/EventBus/EventBusBase.cs | 13 -- .../Volo/Abp/EventBus/Local/LocalEventBus.cs | 5 + .../Abp/EventBus/Local/NullLocalEventBus.cs | 6 + 7 files changed, 145 insertions(+), 93 deletions(-) create mode 100644 framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventTypeWithEventHandlerFactories.cs diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventTypeWithEventHandlerFactories.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventTypeWithEventHandlerFactories.cs new file mode 100644 index 0000000000..836d5cb486 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventTypeWithEventHandlerFactories.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; + +namespace Volo.Abp.EventBus; + +public class EventTypeWithEventHandlerFactories +{ + public Type EventType { get; } + + public List EventHandlerFactories { get; } + + public EventTypeWithEventHandlerFactories(Type eventType, List eventHandlerFactories) + { + EventType = eventType; + EventHandlerFactories = eventHandlerFactories; + } +} diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs index 654895911d..e691b6c58c 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; namespace Volo.Abp.EventBus.Local; @@ -8,11 +9,18 @@ namespace Volo.Abp.EventBus.Local; public interface ILocalEventBus : IEventBus { /// - /// Registers to an event. + /// Registers to an event. /// Same (given) instance of the handler is used for all event occurrences. /// /// Event type /// Object to handle the event IDisposable Subscribe(ILocalEventHandler handler) where TEvent : class; + + /// + /// Gets the list of event handler factories for the given event type. + /// + /// Event type + /// + List GetEventHandlerFactories(Type eventType); } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index ad69ed124b..d6b4ea51bd 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -62,7 +62,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox); } - public async Task PublishAsync( + public virtual async Task PublishAsync( Type eventType, object eventData, bool onUnitOfWorkComplete = true, @@ -227,7 +227,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB { try { - await LocalEventBus.PublishAsync(distributedEvent); + await LocalEventBus.PublishAsync(distributedEvent, onUnitOfWorkComplete: false); } catch (Exception) { @@ -239,7 +239,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB { try { - await LocalEventBus.PublishAsync(distributedEvent); + await LocalEventBus.PublishAsync(distributedEvent, false); } catch (Exception) { diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs index 79fb1add78..430c933bac 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Reflection; +using System.Text; using System.Text.Json; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; @@ -9,7 +11,6 @@ using Microsoft.Extensions.Options; using Volo.Abp.Collections; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Local; -using System.Linq; using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Timing; @@ -22,66 +23,32 @@ namespace Volo.Abp.EventBus.Distributed; [ExposeServices(typeof(IDistributedEventBus), typeof(LocalDistributedEventBus))] public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDependency { - protected ConcurrentDictionary> HandlerFactories { get; } - protected ConcurrentDictionary EventTypes { get; } - public LocalDistributedEventBus(IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, Volo.Abp.Uow.IUnitOfWorkManager unitOfWorkManager, IOptions abpDistributedEventBusOptions, - IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, ILocalEventBus localEventBus, ICorrelationIdProvider correlationIdProvider) - : base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider) + public LocalDistributedEventBus( + IServiceScopeFactory serviceScopeFactory, + ICurrentTenant currentTenant, + IUnitOfWorkManager unitOfWorkManager, + IOptions abpDistributedEventBusOptions, + IGuidGenerator guidGenerator, + IClock clock, + IEventHandlerInvoker eventHandlerInvoker, + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) + : base(serviceScopeFactory, + currentTenant, + unitOfWorkManager, + abpDistributedEventBusOptions, + guidGenerator, + clock, + eventHandlerInvoker, + localEventBus, + correlationIdProvider) { - HandlerFactories = new ConcurrentDictionary>(); EventTypes = new ConcurrentDictionary(); Subscribe(abpDistributedEventBusOptions.Value.Handlers); } - protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) - { - EventTypes.GetOrAdd(eventName, eventType); - return base.OnAddToOutboxAsync(eventName, eventType, eventData); - } - - - public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) - { - var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - if (eventType == null) - { - return; - } - - var eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType); - - if (eventData == null) - { - return; - } - await LocalEventBus.PublishAsync(eventType, eventData); - - } - - public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) - { - var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); - if (eventType == null) - return; - var eventData = JsonSerializer.Deserialize(outgoingEvent.EventData, eventType); - if (eventData == null) - { - return; - } - await LocalEventBus.PublishAsync(eventType, eventData); - } - - public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) - { - foreach (var outgoingEvent in outgoingEvents) - { - await PublishFromOutboxAsync(outgoingEvent, outboxConfig); - } - } - - public virtual void Subscribe(ITypeList handlers) { foreach (var handler in handlers) @@ -103,7 +70,6 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen } } - public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { return LocalEventBus.Subscribe(eventType, factory); @@ -129,51 +95,114 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen LocalEventBus.UnsubscribeAll(eventType); } + public async override Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) + { + if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null) + { + AddToUnitOfWork( + UnitOfWorkManager.Current, + new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext(), useOutbox) + ); + return; + } + + if (useOutbox) + { + if (await AddToOutboxAsync(eventType, eventData)) + { + return; + } + } + + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Direct, + EventName = EventNameAttribute.GetNameOrDefault(eventType), + EventData = eventData + }); + + await TriggerDistributedEventReceivedAsync(new DistributedEventReceived + { + Source = DistributedEventSource.Direct, + EventName = EventNameAttribute.GetNameOrDefault(eventType), + EventData = eventData + }); + + await PublishToEventBusAsync(eventType, eventData); + } + + protected async override Task PublishToEventBusAsync(Type eventType, object eventData) + { + await LocalEventBus.PublishAsync(eventType, eventData, false); + } + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) { unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - protected override IEnumerable GetHandlerFactories(Type eventType) + public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - var handlerFactoryList = new List(); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); - foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)) - ) + await TriggerDistributedEventReceivedAsync(new DistributedEventReceived { - handlerFactoryList.Add( - new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); - } + Source = DistributedEventSource.Direct, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); - return handlerFactoryList.ToArray(); + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName)!; + var eventData = JsonSerializer.Deserialize(Encoding.UTF8.GetString(outgoingEvent.EventData), eventType)!; + await LocalEventBus.PublishAsync(eventType, eventData, false); } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) + public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) { - //Should trigger same type - if (handlerEventType == targetEventType) + foreach (var outgoingEvent in outgoingEvents) { - return true; + await PublishFromOutboxAsync(outgoingEvent, outboxConfig); } + } - //Should trigger for inherited types - if (handlerEventType.IsAssignableFrom(targetEventType)) + public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) + { + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + if (eventType == null) { - return true; + return; } - return false; + var eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType); + var exceptions = new List(); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData!, exceptions, inboxConfig); + } + if (exceptions.Any()) + { + ThrowOriginalExceptions(eventType, exceptions); + } } - - protected async override Task PublishToEventBusAsync(Type eventType, object eventData) + protected override byte[] Serialize(object eventData) { - await LocalEventBus.PublishAsync(eventType, eventData); + return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(eventData)); + } + protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) + { + EventTypes.GetOrAdd(eventName, eventType); + return base.OnAddToOutboxAsync(eventName, eventType, eventData); } - protected override byte[] Serialize(object eventData) + protected override IEnumerable GetHandlerFactories(Type eventType) { - return JsonSerializer.SerializeToUtf8Bytes(eventData); + return LocalEventBus.GetEventHandlerFactories(eventType); } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs index 151a149281..231b4c7a0f 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs @@ -242,19 +242,6 @@ public abstract class EventBusBase : IEventBus }; } - protected class EventTypeWithEventHandlerFactories - { - public Type EventType { get; } - - public List EventHandlerFactories { get; } - - public EventTypeWithEventHandlerFactories(Type eventType, List eventHandlerFactories) - { - EventType = eventType; - EventHandlerFactories = eventHandlerFactories; - } - } - // Reference from // https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/ protected struct SynchronizationContextRemover : INotifyCompletion diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs index 48f70ac8c1..7123ff340a 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs @@ -136,6 +136,11 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData); } + public virtual List GetEventHandlerFactories(Type eventType) + { + return GetHandlerFactories(eventType).ToList(); + } + protected override IEnumerable GetHandlerFactories(Type eventType) { var handlerFactoryList = new List>(); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs index 94c5f4ff83..3ffcd911ce 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Volo.Abp.EventBus.Local; @@ -22,6 +23,11 @@ public sealed class NullLocalEventBus : ILocalEventBus return NullDisposable.Instance; } + public List GetEventHandlerFactories(Type eventType) + { + return new List(); + } + public IDisposable Subscribe() where TEvent : class where THandler : IEventHandler, new() { return NullDisposable.Instance;