Browse Source

refactor: Remove dynamic event support from Dapr distributed event bus implementation

pull/25023/head
maliming 1 week ago
parent
commit
ca9b3c54db
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 2
      docs/en/framework/infrastructure/event-bus/distributed/index.md
  2. 10
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs
  3. 96
      framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs

2
docs/en/framework/infrastructure/event-bus/distributed/index.md

@ -725,6 +725,8 @@ Configure<AbpDistributedEventBusOptions>(options =>
In addition to the type-safe event system described above, ABP also supports **dynamic events** that are identified by a string name rather than a CLR type. This is useful for scenarios where event types are not known at compile time, such as integrating with external systems or building plugin architectures. In addition to the type-safe event system described above, ABP also supports **dynamic events** that are identified by a string name rather than a CLR type. This is useful for scenarios where event types are not known at compile time, such as integrating with external systems or building plugin architectures.
> **Note:** Dynamic event subscriptions are supported by RabbitMQ, Kafka, Azure Service Bus, and Rebus providers. The **Dapr provider does not support dynamic events** because Dapr requires topic subscriptions to be declared at application startup and cannot add subscriptions at runtime. Attempting to call `Subscribe(string, ...)` on the Dapr provider will throw an `AbpException`.
### Publishing Dynamic Events ### Publishing Dynamic Events
Use the `PublishAsync` overload that accepts a string event name: Use the `PublishAsync` overload that accepts a string event name:

10
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs

@ -103,11 +103,6 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule
var eventData = daprSerializer.Deserialize(daprEventData.JsonData, eventType); var eventData = daprSerializer.Deserialize(daprEventData.JsonData, eventType);
await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId); await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId);
} }
else if (distributedEventBus.IsDynamicEvent(daprEventData.Topic))
{
var eventData = daprSerializer.Deserialize(daprEventData.JsonData, typeof(object));
await distributedEventBus.TriggerHandlersAsync(typeof(DynamicEventData), new DynamicEventData(daprEventData.Topic, eventData), daprEventData.MessageId, daprEventData.CorrelationId);
}
} }
else else
{ {
@ -117,11 +112,6 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule
var eventData = daprSerializer.Deserialize(data, eventType); var eventData = daprSerializer.Deserialize(data, eventType);
await distributedEventBus.TriggerHandlersAsync(eventType, eventData); await distributedEventBus.TriggerHandlersAsync(eventType, eventData);
} }
else if (distributedEventBus.IsDynamicEvent(topic))
{
var eventData = daprSerializer.Deserialize(data, typeof(object));
await distributedEventBus.TriggerHandlersAsync(typeof(DynamicEventData), new DynamicEventData(topic, eventData));
}
} }
httpContext.Response.StatusCode = 200; httpContext.Response.StatusCode = 200;

96
framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs

@ -28,7 +28,6 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; } protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; } protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> DynamicHandlerFactories { get; }
public DaprDistributedEventBus( public DaprDistributedEventBus(
IServiceScopeFactory serviceScopeFactory, IServiceScopeFactory serviceScopeFactory,
@ -59,7 +58,6 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>(); HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>(); EventTypes = new ConcurrentDictionary<string, Type>();
DynamicHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
} }
public void Initialize() public void Initialize()
@ -84,16 +82,10 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
/// <inheritdoc/> /// <inheritdoc/>
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{ {
var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName); throw new AbpException(
"Dapr distributed event bus does not support dynamic event subscriptions. " +
if (handler.IsInFactories(handlerFactories)) "Dapr requires topic subscriptions to be declared at startup and cannot add subscriptions at runtime. " +
{ "Use a typed event handler (IDistributedEventHandler<T>) instead.");
return NullDisposable.Instance;
}
handlerFactories.Add(handler);
return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler);
} }
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action) public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
@ -150,19 +142,16 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{ {
var eventType = EventTypes.GetOrDefault(eventName); var eventType = EventTypes.GetOrDefault(eventName);
var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData);
if (eventType != null) if (eventType != null)
{ {
var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData);
return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete); return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete);
} }
if (DynamicHandlerFactories.ContainsKey(eventName)) throw new AbpException(
{ "Dapr distributed event bus does not support dynamic event publishing. " +
return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete); "Dapr requires topic subscriptions to be declared at startup. " +
} "Use a typed event (PublishAsync<TEvent>) or ensure the event name matches a registered typed event.");
throw new AbpException($"Unknown event name: {eventName}");
} }
protected async override Task PublishToEventBusAsync(Type eventType, object eventData) protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
@ -179,21 +168,12 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{ {
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
object eventData; if (eventType == null)
if (eventType != null)
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
}
else if (DynamicHandlerFactories.ContainsKey(outgoingEvent.EventName))
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, typeof(object));
}
else
{ {
return; return;
} }
var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
await PublishToDaprAsync(outgoingEvent.EventName, eventData, outgoingEvent.Id, outgoingEvent.GetCorrelationId()); await PublishToDaprAsync(outgoingEvent.EventName, eventData, outgoingEvent.Id, outgoingEvent.GetCorrelationId());
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
@ -233,20 +213,13 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData; object eventData;
if (eventType != null) if (eventType == null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName))
{
eventData = new DynamicEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object)));
eventType = typeof(DynamicEventData);
}
else
{ {
return; return;
} }
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
var exceptions = new List<Exception>(); var exceptions = new List<Exception>();
using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId()))
{ {
@ -277,10 +250,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{ {
if (typeof(DynamicEventData) != eventType) EventTypes.GetOrAdd(eventName, eventType);
{
EventTypes.GetOrAdd(eventName, eventType);
}
return base.OnAddToOutboxAsync(eventName, eventType, eventData); return base.OnAddToOutboxAsync(eventName, eventType, eventData);
} }
@ -300,18 +270,12 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType) protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{ {
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>(); var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
var eventNames = EventTypes.Where(x => ShouldTriggerEventForHandler(eventType, x.Value)).Select(x => x.Key).ToList();
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
{ {
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
} }
foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return handlerFactoryList.ToArray(); return handlerFactoryList.ToArray();
} }
@ -327,33 +291,25 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public bool IsDynamicEvent(string eventName) public bool IsDynamicEvent(string eventName)
{ {
return DynamicHandlerFactories.ContainsKey(eventName); return false;
} }
/// <inheritdoc/> /// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory) public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{ {
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); throw new AbpException("Dapr distributed event bus does not support dynamic event subscriptions.");
} }
/// <inheritdoc/> /// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler) public override void Unsubscribe(string eventName, IEventHandler handler)
{ {
GetOrCreateDynamicHandlerFactories(eventName) throw new AbpException("Dapr distributed event bus does not support dynamic event subscriptions.");
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
} }
/// <inheritdoc/> /// <inheritdoc/>
public override void UnsubscribeAll(string eventName) public override void UnsubscribeAll(string eventName)
{ {
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear()); throw new AbpException("Dapr distributed event bus does not support dynamic event subscriptions.");
} }
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetDynamicHandlerFactories(string eventName) protected override IEnumerable<EventTypeWithEventHandlerFactories> GetDynamicHandlerFactories(string eventName)
@ -364,19 +320,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
return GetHandlerFactories(eventType); return GetHandlerFactories(eventType);
} }
var result = new List<EventTypeWithEventHandlerFactories>(); return Array.Empty<EventTypeWithEventHandlerFactories>();
foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName))
{
result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return result;
}
private List<IEventHandlerFactory> GetOrCreateDynamicHandlerFactories(string eventName)
{
return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
} }
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)

Loading…
Cancel
Save