From ca9b3c54db0249be3213b9e42112e1f69bcb928b Mon Sep 17 00:00:00 2001 From: maliming Date: Sun, 22 Mar 2026 20:37:34 +0800 Subject: [PATCH] refactor: Remove dynamic event support from Dapr distributed event bus implementation --- .../event-bus/distributed/index.md | 2 + .../AbpAspNetCoreMvcDaprEventBusModule.cs | 10 -- .../EventBus/Dapr/DaprDistributedEventBus.cs | 96 ++++--------------- 3 files changed, 22 insertions(+), 86 deletions(-) diff --git a/docs/en/framework/infrastructure/event-bus/distributed/index.md b/docs/en/framework/infrastructure/event-bus/distributed/index.md index 74179bb30c..5f2c7b11ec 100644 --- a/docs/en/framework/infrastructure/event-bus/distributed/index.md +++ b/docs/en/framework/infrastructure/event-bus/distributed/index.md @@ -725,6 +725,8 @@ Configure(options => In addition to the type-safe event system described above, ABP also supports **dynamic events** that are identified by a string name rather than a CLR type. This is useful for scenarios where event types are not known at compile time, such as integrating with external systems or building plugin architectures. +> **Note:** Dynamic event subscriptions are supported by RabbitMQ, Kafka, Azure Service Bus, and Rebus providers. The **Dapr provider does not support dynamic events** because Dapr requires topic subscriptions to be declared at application startup and cannot add subscriptions at runtime. Attempting to call `Subscribe(string, ...)` on the Dapr provider will throw an `AbpException`. + ### Publishing Dynamic Events Use the `PublishAsync` overload that accepts a string event name: diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs index 19ca7dc71f..fba9e12707 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs @@ -103,11 +103,6 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule var eventData = daprSerializer.Deserialize(daprEventData.JsonData, eventType); await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId); } - else if (distributedEventBus.IsDynamicEvent(daprEventData.Topic)) - { - var eventData = daprSerializer.Deserialize(daprEventData.JsonData, typeof(object)); - await distributedEventBus.TriggerHandlersAsync(typeof(DynamicEventData), new DynamicEventData(daprEventData.Topic, eventData), daprEventData.MessageId, daprEventData.CorrelationId); - } } else { @@ -117,11 +112,6 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule var eventData = daprSerializer.Deserialize(data, eventType); await distributedEventBus.TriggerHandlersAsync(eventType, eventData); } - else if (distributedEventBus.IsDynamicEvent(topic)) - { - var eventData = daprSerializer.Deserialize(data, typeof(object)); - await distributedEventBus.TriggerHandlersAsync(typeof(DynamicEventData), new DynamicEventData(topic, eventData)); - } } httpContext.Response.StatusCode = 200; diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index 3f4f1780db..d122d3e5e3 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -28,7 +28,6 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend protected ConcurrentDictionary> HandlerFactories { get; } protected ConcurrentDictionary EventTypes { get; } - protected ConcurrentDictionary> DynamicHandlerFactories { get; } public DaprDistributedEventBus( IServiceScopeFactory serviceScopeFactory, @@ -59,7 +58,6 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend HandlerFactories = new ConcurrentDictionary>(); EventTypes = new ConcurrentDictionary(); - DynamicHandlerFactories = new ConcurrentDictionary>(); } public void Initialize() @@ -84,16 +82,10 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend /// public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { - var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName); - - if (handler.IsInFactories(handlerFactories)) - { - return NullDisposable.Instance; - } - - handlerFactories.Add(handler); - - return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler); + throw new AbpException( + "Dapr distributed event bus does not support dynamic event subscriptions. " + + "Dapr requires topic subscriptions to be declared at startup and cannot add subscriptions at runtime. " + + "Use a typed event handler (IDistributedEventHandler) instead."); } public override void Unsubscribe(Func action) @@ -150,19 +142,16 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { var eventType = EventTypes.GetOrDefault(eventName); - var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData); - if (eventType != null) { + var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData); return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete); } - if (DynamicHandlerFactories.ContainsKey(eventName)) - { - return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete); - } - - throw new AbpException($"Unknown event name: {eventName}"); + throw new AbpException( + "Dapr distributed event bus does not support dynamic event publishing. " + + "Dapr requires topic subscriptions to be declared at startup. " + + "Use a typed event (PublishAsync) or ensure the event name matches a registered typed event."); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -179,21 +168,12 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); - object eventData; - - if (eventType != null) - { - eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); - } - else if (DynamicHandlerFactories.ContainsKey(outgoingEvent.EventName)) - { - eventData = Serializer.Deserialize(outgoingEvent.EventData, typeof(object)); - } - else + if (eventType == null) { return; } + var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); await PublishToDaprAsync(outgoingEvent.EventName, eventData, outgoingEvent.Id, outgoingEvent.GetCorrelationId()); using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) @@ -233,20 +213,13 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); object eventData; - if (eventType != null) - { - eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); - } - else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName)) - { - eventData = new DynamicEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object))); - eventType = typeof(DynamicEventData); - } - else + if (eventType == null) { return; } + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { @@ -277,10 +250,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) { - if (typeof(DynamicEventData) != eventType) - { - EventTypes.GetOrAdd(eventName, eventType); - } + EventTypes.GetOrAdd(eventName, eventType); return base.OnAddToOutboxAsync(eventName, eventType, eventData); } @@ -300,18 +270,12 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend protected override IEnumerable GetHandlerFactories(Type eventType) { var handlerFactoryList = new List(); - var eventNames = EventTypes.Where(x => ShouldTriggerEventForHandler(eventType, x.Value)).Select(x => x.Key).ToList(); foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) { handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); } - foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key))) - { - handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value)); - } - return handlerFactoryList.ToArray(); } @@ -327,33 +291,25 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public bool IsDynamicEvent(string eventName) { - return DynamicHandlerFactories.ContainsKey(eventName); + return false; } /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); + throw new AbpException("Dapr distributed event bus does not support dynamic event subscriptions."); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - GetOrCreateDynamicHandlerFactories(eventName) - .Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); + throw new AbpException("Dapr distributed event bus does not support dynamic event subscriptions."); } /// public override void UnsubscribeAll(string eventName) { - GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear()); + throw new AbpException("Dapr distributed event bus does not support dynamic event subscriptions."); } protected override IEnumerable GetDynamicHandlerFactories(string eventName) @@ -364,19 +320,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return GetHandlerFactories(eventType); } - var result = new List(); - - foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName)) - { - result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value)); - } - - return result; - } - - private List GetOrCreateDynamicHandlerFactories(string eventName) - { - return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List()); + return Array.Empty(); } private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)