Browse Source

refactor: Replace AnonymousEventData with DynamicEventData across the event bus implementation

pull/25023/head
maliming 3 months ago
parent
commit
fb6f4722ff
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 80
      docs/en/framework/infrastructure/event-bus/distributed/index.md
  2. 51
      docs/en/framework/infrastructure/event-bus/local/index.md
  3. 10
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs
  4. 6
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IDistributedEventBus.cs
  5. 46
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/DynamicEventData.cs
  6. 2
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/IEventBus.cs
  7. 2
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs
  8. 50
      framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs
  9. 50
      framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs
  10. 50
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
  11. 50
      framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs
  12. 58
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs
  13. 16
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs
  14. 40
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs
  15. 2
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs
  16. 18
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs
  17. 4
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerFactoryUnregistrar.cs
  18. 42
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs
  19. 2
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs
  20. 52
      framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs
  21. 30
      framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Dynamic_Test.cs
  22. 24
      test/DistEvents/DistDemoApp.Shared/DistEventScenarioProfile.cs
  23. 44
      test/DistEvents/DistDemoApp.Shared/DistEventScenarioRunner.cs

80
docs/en/framework/infrastructure/event-bus/distributed/index.md

@ -721,6 +721,86 @@ Configure<AbpDistributedEventBusOptions>(options =>
});
````
## Dynamic (String-Based) Events
In addition to the type-safe event system described above, ABP also supports **dynamic events** that are identified by a string name rather than a CLR type. This is useful for scenarios where event types are not known at compile time, such as integrating with external systems or building plugin architectures.
### Publishing Dynamic Events
Use the `PublishAsync` overload that accepts a string event name:
````csharp
await distributedEventBus.PublishAsync(
"MyDynamicEvent",
new Dictionary<string, object>
{
["UserId"] = 42,
["Name"] = "John"
}
);
````
If a typed event exists with the given name (via `EventNameAttribute` or convention), the data is automatically deserialized and routed to the typed handler. Otherwise, it is delivered as a `DynamicEventData` to dynamic handlers.
You can also control `onUnitOfWorkComplete` and `useOutbox` parameters:
````csharp
await distributedEventBus.PublishAsync(
"MyDynamicEvent",
new { UserId = 42, Name = "John" },
onUnitOfWorkComplete: true,
useOutbox: true
);
````
### Subscribing to Dynamic Events
Use the `Subscribe` overload that accepts a string event name:
````csharp
var subscription = distributedEventBus.Subscribe(
"MyDynamicEvent",
new SingleInstanceHandlerFactory(
new ActionEventHandler<DynamicEventData>(eventData =>
{
// Access the event name
var name = eventData.EventName;
// Convert to a loosely-typed object (Dictionary/List/primitives)
var obj = eventData.ConvertToTypedObject();
// Or convert to a strongly-typed object
var typed = eventData.ConvertToTypedObject<MyEventDto>();
return Task.CompletedTask;
})));
// Unsubscribe when done
subscription.Dispose();
````
You can also subscribe using a typed distributed event handler:
````csharp
distributedEventBus.Subscribe("MyDynamicEvent", myDistributedEventHandler);
````
Where `myDistributedEventHandler` implements `IDistributedEventHandler<DynamicEventData>`.
### Mixed Typed and Dynamic Handlers
When both a typed handler and a dynamic handler are registered for the same event name, **both** handlers are triggered. The typed handler receives the deserialized typed data, while the dynamic handler receives a `DynamicEventData` wrapper.
### DynamicEventData Class
The `DynamicEventData` class wraps the event payload with a string-based event name:
- **`EventName`**: The string name that identifies the event.
- **`Data`**: The raw event data payload.
- **`ConvertToTypedObject()`**: Converts data to a loosely-typed object graph (dictionaries, lists, primitives).
- **`ConvertToTypedObject<T>()`**: Deserializes data to a strongly-typed `T` object.
- **`ConvertToTypedObject(Type type)`**: Deserializes data to the specified type.
## See Also
* [Local Event Bus](../local)

51
docs/en/framework/infrastructure/event-bus/local/index.md

@ -249,6 +249,57 @@ If you set it to `false`, the `EntityUpdatedEventData<T>` will not be published
> This option is only used for the EF Core.
## Dynamic (String-Based) Events
In addition to the type-safe event system described above, ABP also supports **dynamic events** that are identified by a string name rather than a CLR type. This is useful for scenarios where event types are not known at compile time.
### Publishing Dynamic Events
Use the `PublishAsync` overload that accepts a string event name:
````csharp
await localEventBus.PublishAsync(
"MyDynamicEvent",
new Dictionary<string, object>
{
["UserId"] = 42,
["Name"] = "John"
}
);
````
If a typed event exists with the given name (via `EventNameAttribute` or convention), the data is automatically converted and routed to the typed handler. Otherwise, it is delivered as a `DynamicEventData` to dynamic handlers.
### Subscribing to Dynamic Events
Use the `Subscribe` overload that accepts a string event name:
````csharp
var subscription = localEventBus.Subscribe(
"MyDynamicEvent",
new SingleInstanceHandlerFactory(
new ActionEventHandler<DynamicEventData>(eventData =>
{
// Access the event name
var name = eventData.EventName;
// Convert to a loosely-typed object (Dictionary/List/primitives)
var obj = eventData.ConvertToTypedObject();
// Or convert to a strongly-typed object
var typed = eventData.ConvertToTypedObject<MyEventDto>();
return Task.CompletedTask;
})));
// Unsubscribe when done
subscription.Dispose();
````
### Mixed Typed and Dynamic Handlers
When both a typed handler and a dynamic handler are registered for the same event name, **both** handlers are triggered. The typed handler receives the converted typed data, while the dynamic handler receives a `DynamicEventData` wrapper.
## See Also
* [Distributed Event Bus](../distributed)

10
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs

@ -102,10 +102,11 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule
{
var eventData = daprSerializer.Deserialize(daprEventData.JsonData, eventType);
await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId);
}else if (distributedEventBus.IsAnonymousEvent(daprEventData.Topic))
}
else if (distributedEventBus.IsDynamicEvent(daprEventData.Topic))
{
var eventData = daprSerializer.Deserialize(daprEventData.JsonData, typeof(object));
await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(daprEventData.Topic, eventData), daprEventData.MessageId, daprEventData.CorrelationId);
await distributedEventBus.TriggerHandlersAsync(typeof(DynamicEventData), new DynamicEventData(daprEventData.Topic, eventData), daprEventData.MessageId, daprEventData.CorrelationId);
}
}
else
@ -115,10 +116,11 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule
{
var eventData = daprSerializer.Deserialize(data, eventType);
await distributedEventBus.TriggerHandlersAsync(eventType, eventData);
}else if (distributedEventBus.IsAnonymousEvent(topic))
}
else if (distributedEventBus.IsDynamicEvent(topic))
{
var eventData = daprSerializer.Deserialize(data, typeof(object));
await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(topic, eventData));
await distributedEventBus.TriggerHandlersAsync(typeof(DynamicEventData), new DynamicEventData(topic, eventData));
}
}

6
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IDistributedEventBus.cs

@ -45,15 +45,15 @@ public interface IDistributedEventBus : IEventBus
/// <summary>
/// Registers to an event by its string-based event name.
/// Same (given) instance of the handler is used for all event occurrences.
/// Wraps the handler as <see cref="IDistributedEventHandler{AnonymousEventData}"/>.
/// Wraps the handler as <see cref="IDistributedEventHandler{DynamicEventData}"/>.
/// </summary>
/// <param name="eventName">Name of the event</param>
/// <param name="handler">Object to handle the event</param>
IDisposable Subscribe(string eventName, IDistributedEventHandler<AnonymousEventData> handler);
IDisposable Subscribe(string eventName, IDistributedEventHandler<DynamicEventData> handler);
/// <summary>
/// Triggers an event by its string-based event name.
/// Used for anonymous (type-less) event publishing over distributed event bus.
/// Used for dynamic (type-less) event publishing over distributed event bus.
/// </summary>
/// <param name="eventName">Name of the event</param>
/// <param name="eventData">Related data for the event</param>

46
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs → framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/DynamicEventData.cs

@ -6,53 +6,35 @@ using System.Text.Json;
namespace Volo.Abp.EventBus;
/// <summary>
/// Wraps arbitrary event data with a string-based event name for anonymous (type-less) event handling.
/// Acts as both an envelope and event type for events that are identified by name rather than CLR type.
/// Wraps arbitrary event data with a string-based event name for dynamic (type-less) event handling.
/// </summary>
[Serializable]
public class AnonymousEventData
public class DynamicEventData
{
/// <summary>
/// The string-based name that identifies the event.
/// </summary>
public string EventName { get; }
/// <summary>
/// The raw event data payload. Can be a CLR object, <see cref="JsonElement"/>, or any serializable object.
/// </summary>
public object Data { get; }
private JsonElement? _cachedJsonElement;
/// <summary>
/// Creates a new instance of <see cref="AnonymousEventData"/>.
/// </summary>
/// <param name="eventName">The string-based name that identifies the event</param>
/// <param name="data">The raw event data payload</param>
public AnonymousEventData(string eventName, object data)
public DynamicEventData(string eventName, object data)
{
EventName = eventName;
Data = data;
EventName = Check.NotNullOrWhiteSpace(eventName, nameof(eventName));
Data = Check.NotNull(data, nameof(data));
}
/// <summary>
/// Converts the <see cref="Data"/> to a loosely-typed object graph
/// Converts <see cref="Data"/> to a loosely-typed object graph
/// (dictionaries for objects, lists for arrays, primitives for values).
/// </summary>
/// <returns>A CLR object representation of the event data</returns>
public object ConvertToTypedObject()
{
return ConvertElement(GetJsonElement());
}
/// <summary>
/// Converts the <see cref="Data"/> to a strongly-typed <typeparamref name="T"/> object.
/// Returns the data directly if it is already of type <typeparamref name="T"/>,
/// otherwise deserializes from JSON.
/// Converts <see cref="Data"/> to a strongly-typed <typeparamref name="T"/> object.
/// Returns the data directly if it is already of type <typeparamref name="T"/>.
/// </summary>
/// <typeparam name="T">Target type to convert to</typeparam>
/// <returns>The deserialized object of type <typeparamref name="T"/></returns>
/// <exception cref="InvalidOperationException">Thrown when deserialization fails</exception>
public T ConvertToTypedObject<T>()
{
if (Data is T typedData)
@ -61,17 +43,13 @@ public class AnonymousEventData
}
return GetJsonElement().Deserialize<T>()
?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}.");
?? throw new InvalidOperationException($"Failed to deserialize DynamicEventData to {typeof(T).FullName}.");
}
/// <summary>
/// Converts the <see cref="Data"/> to the specified <paramref name="type"/>.
/// Returns the data directly if it is already an instance of the target type,
/// otherwise deserializes from JSON.
/// Converts <see cref="Data"/> to the specified <paramref name="type"/>.
/// Returns the data directly if it is already an instance of the target type.
/// </summary>
/// <param name="type">Target type to convert to</param>
/// <returns>The deserialized object</returns>
/// <exception cref="InvalidOperationException">Thrown when deserialization fails</exception>
public object ConvertToTypedObject(Type type)
{
if (type.IsInstanceOfType(Data))
@ -80,7 +58,7 @@ public class AnonymousEventData
}
return GetJsonElement().Deserialize(type)
?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}.");
?? throw new InvalidOperationException($"Failed to deserialize DynamicEventData to {type.FullName}.");
}
private JsonElement GetJsonElement()

2
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/IEventBus.cs

@ -26,7 +26,7 @@ public interface IEventBus
/// <summary>
/// Triggers an event by its string-based event name.
/// Used for anonymous (type-less) event publishing.
/// Used for dynamic (type-less) event publishing.
/// </summary>
/// <param name="eventName">Name of the event</param>
/// <param name="eventData">Related data for the event</param>

2
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs

@ -29,5 +29,5 @@ public interface ILocalEventBus : IEventBus
/// </summary>
/// <param name="eventName">Name of the event</param>
/// <returns>List of event handler factories registered for the given event name</returns>
List<EventTypeWithEventHandlerFactories> GetAnonymousEventHandlerFactories(string eventName);
List<EventTypeWithEventHandlerFactories> GetDynamicEventHandlerFactories(string eventName);
}

50
framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs

@ -30,7 +30,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected IAzureServiceBusSerializer Serializer { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> AnonymousHandlerFactories { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> DynamicHandlerFactories { get; }
protected IAzureServiceBusMessageConsumer Consumer { get; private set; } = default!;
public AzureDistributedEventBus(
@ -63,7 +63,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
PublisherPool = publisherPool;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
AnonymousHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
DynamicHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
}
public void Initialize()
@ -92,11 +92,11 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
eventData = Serializer.Deserialize(message.Body.ToArray(), eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(eventName))
else if (DynamicHandlerFactories.ContainsKey(eventName))
{
var data = Serializer.Deserialize<object>(message.Body.ToArray());
eventData = new AnonymousEventData(eventName, data);
eventType = typeof(AnonymousEventData);
eventData = new DynamicEventData(eventName, data);
eventType = typeof(DynamicEventData);
}
else
{
@ -131,7 +131,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName);
if (handler.IsInFactories(handlerFactories))
{
@ -140,7 +140,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
handlerFactories.Add(handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler);
}
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
@ -200,16 +200,16 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
return PublishAsync(eventType, dynamicEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
if (DynamicHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
@ -294,11 +294,11 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName))
{
var element = Serializer.Deserialize<object>(incomingEvent.EventData);
eventData = new AnonymousEventData(incomingEvent.EventName, element);
eventType = typeof(AnonymousEventData);
eventData = new DynamicEventData(incomingEvent.EventName, element);
eventType = typeof(DynamicEventData);
}
else
{
@ -354,7 +354,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
if (typeof(AnonymousEventData) != eventType)
if (typeof(DynamicEventData) != eventType)
{
EventTypes.GetOrAdd(eventName, eventType);
}
@ -384,9 +384,9 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
foreach (var handlerFactory in AnonymousHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value));
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return handlerFactoryList.ToArray();
@ -400,14 +400,14 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName)
GetOrCreateDynamicHandlerFactories(eventName)
.Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
GetOrCreateDynamicHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
@ -421,11 +421,11 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName)
GetOrCreateDynamicHandlerFactories(eventName)
.Locking(factories => factories.Clear());
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetDynamicHandlerFactories(string eventName)
{
var result = new List<EventTypeWithEventHandlerFactories>();
@ -435,17 +435,17 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
result.AddRange(GetHandlerFactories(eventType));
}
foreach (var handlerFactory in AnonymousHandlerFactories.Where(hf => hf.Key == eventName))
foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName))
{
result.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value));
result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return result;
}
private List<IEventHandlerFactory> GetOrCreateAnonymousHandlerFactories(string eventName)
private List<IEventHandlerFactory> GetOrCreateDynamicHandlerFactories(string eventName)
{
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)

50
framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs

@ -28,7 +28,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> AnonymousHandlerFactories { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> DynamicHandlerFactories { get; }
public DaprDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
@ -59,7 +59,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
AnonymousHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
DynamicHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
}
public void Initialize()
@ -84,7 +84,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
/// <inheritdoc/>
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName);
if (handler.IsInFactories(handlerFactories))
{
@ -93,7 +93,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
handlerFactories.Add(handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler);
}
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
@ -150,16 +150,16 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
return PublishAsync(eventType, dynamicEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
if (DynamicHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
@ -185,7 +185,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName))
else if (DynamicHandlerFactories.ContainsKey(outgoingEvent.EventName))
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, typeof(object));
}
@ -237,10 +237,10 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName))
{
eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object)));
eventType = typeof(AnonymousEventData);
eventData = new DynamicEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object)));
eventType = typeof(DynamicEventData);
}
else
{
@ -277,7 +277,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
if (typeof(AnonymousEventData) != eventType)
if (typeof(DynamicEventData) != eventType)
{
EventTypes.GetOrAdd(eventName, eventType);
}
@ -307,9 +307,9 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
foreach (var handlerFactory in AnonymousHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value));
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return handlerFactoryList.ToArray();
@ -325,21 +325,21 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
return EventTypes.GetOrDefault(eventName);
}
public bool IsAnonymousEvent(string eventName)
public bool IsDynamicEvent(string eventName)
{
return AnonymousHandlerFactories.ContainsKey(eventName);
return DynamicHandlerFactories.ContainsKey(eventName);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
GetOrCreateDynamicHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
@ -353,10 +353,10 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear());
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetDynamicHandlerFactories(string eventName)
{
var result = new List<EventTypeWithEventHandlerFactories>();
@ -366,17 +366,17 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
result.AddRange(GetHandlerFactories(eventType));
}
foreach (var handlerFactory in AnonymousHandlerFactories.Where(hf => hf.Key == eventName))
foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName))
{
result.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value));
result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return result;
}
private List<IEventHandlerFactory> GetOrCreateAnonymousHandlerFactories(string eventName)
private List<IEventHandlerFactory> GetOrCreateDynamicHandlerFactories(string eventName)
{
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)

50
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs

@ -30,7 +30,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected IProducerPool ProducerPool { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> AnonymousHandlerFactories { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> DynamicHandlerFactories { get; }
protected IKafkaMessageConsumer Consumer { get; private set; } = default!;
public KafkaDistributedEventBus(
@ -65,7 +65,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
AnonymousHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
DynamicHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
}
public void Initialize()
@ -92,11 +92,11 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
eventData = Serializer.Deserialize(message.Value, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(eventName))
else if (DynamicHandlerFactories.ContainsKey(eventName))
{
var element = Serializer.Deserialize<object>(message.Value);
eventData = new AnonymousEventData(eventName, element);
eventType = typeof(AnonymousEventData);
eventData = new DynamicEventData(eventName, element);
eventType = typeof(DynamicEventData);
}
else
{
@ -131,7 +131,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName);
if (handler.IsInFactories(handlerFactories))
{
@ -140,7 +140,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
handlerFactories.Add(handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler);
}
/// <inheritdoc/>
@ -201,16 +201,16 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
return PublishAsync(eventType, dynamicEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
if (DynamicHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
@ -332,11 +332,11 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName))
{
var element = Serializer.Deserialize<object>(incomingEvent.EventData);
eventData = new AnonymousEventData(incomingEvent.EventName, element);
eventType = typeof(AnonymousEventData);
eventData = new DynamicEventData(incomingEvent.EventName, element);
eventType = typeof(DynamicEventData);
}
else
{
@ -390,7 +390,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
if (typeof(AnonymousEventData) != eventType)
if (typeof(DynamicEventData) != eventType)
{
EventTypes.GetOrAdd(eventName, eventType);
}
@ -420,9 +420,9 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
foreach (var handlerFactory in AnonymousHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value));
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return handlerFactoryList.ToArray();
@ -436,13 +436,13 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
GetOrCreateDynamicHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
@ -456,10 +456,10 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear());
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetDynamicHandlerFactories(string eventName)
{
var result = new List<EventTypeWithEventHandlerFactories>();
@ -469,17 +469,17 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
result.AddRange(GetHandlerFactories(eventType));
}
foreach (var handlerFactory in AnonymousHandlerFactories.Where(hf => hf.Key == eventName))
foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName))
{
result.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value));
result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return result;
}
private List<IEventHandlerFactory> GetOrCreateAnonymousHandlerFactories(string eventName)
private List<IEventHandlerFactory> GetOrCreateDynamicHandlerFactories(string eventName)
{
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)

50
framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs

@ -33,7 +33,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
//TODO: Accessing to the List<IEventHandlerFactory> may not be thread-safe!
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> AnonymousHandlerFactories { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> DynamicHandlerFactories { get; }
protected IRabbitMqMessageConsumerFactory MessageConsumerFactory { get; }
protected IRabbitMqMessageConsumer Consumer { get; private set; } = default!;
@ -71,7 +71,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
AnonymousHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
DynamicHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
}
public virtual void Initialize()
@ -109,10 +109,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
{
eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(eventName))
else if (DynamicHandlerFactories.ContainsKey(eventName))
{
eventType = typeof(AnonymousEventData);
eventData = new AnonymousEventData(eventName, Serializer.Deserialize<object>(ea.Body.ToArray()));
eventType = typeof(DynamicEventData);
eventData = new DynamicEventData(eventName, Serializer.Deserialize<object>(ea.Body.ToArray()));
}
else
{
@ -153,7 +153,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
/// <inheritdoc/>
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName);
if (handler.IsInFactories(handlerFactories))
{
@ -167,7 +167,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
Consumer.BindAsync(eventName);
}
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler);
}
/// <inheritdoc/>
@ -228,16 +228,16 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
return PublishAsync(eventType, dynamicEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
if (DynamicHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
@ -312,10 +312,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName))
{
eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize<object>(incomingEvent.EventData));
eventType = typeof(AnonymousEventData);
eventData = new DynamicEventData(incomingEvent.EventName, Serializer.Deserialize<object>(incomingEvent.EventData));
eventType = typeof(DynamicEventData);
}
else
{
@ -441,7 +441,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
if (typeof(AnonymousEventData) != eventType)
if (typeof(DynamicEventData) != eventType)
{
EventTypes.GetOrAdd(eventName, eventType);
}
@ -471,9 +471,9 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
foreach (var handlerFactory in AnonymousHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value));
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return handlerFactoryList.ToArray();
@ -487,13 +487,13 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
GetOrCreateDynamicHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
@ -507,10 +507,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear());
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetDynamicHandlerFactories(string eventName)
{
var result = new List<EventTypeWithEventHandlerFactories>();
@ -520,17 +520,17 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
return GetHandlerFactories(eventType);
}
foreach (var handlerFactory in AnonymousHandlerFactories.Where(hf => hf.Key == eventName))
foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName))
{
result.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value));
result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return result;
}
private List<IEventHandlerFactory> GetOrCreateAnonymousHandlerFactories(string eventName)
private List<IEventHandlerFactory> GetOrCreateDynamicHandlerFactories(string eventName)
{
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)

58
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

@ -31,7 +31,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
//TODO: Accessing to the List<IEventHandlerFactory> may not be thread-safe!
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> AnonymousHandlerFactories { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> DynamicHandlerFactories { get; }
protected AbpRebusEventBusOptions AbpRebusEventBusOptions { get; }
public RebusDistributedEventBus(
@ -64,7 +64,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
AnonymousHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
DynamicHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
}
public void Initialize()
@ -76,9 +76,9 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
var messageId = MessageContext.Current.TransportMessage.GetMessageId();
string eventName;
if (eventType == typeof(AnonymousEventData) && eventData is AnonymousEventData anonymousEventData)
if (eventType == typeof(DynamicEventData) && eventData is DynamicEventData dynamicEventData)
{
eventName = anonymousEventData.EventName;
eventName = dynamicEventData.EventName;
}
else
{
@ -119,7 +119,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName);
if (handler.IsInFactories(handlerFactories))
{
@ -128,12 +128,12 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
handlerFactories.Add(handler);
if (AnonymousHandlerFactories.Count == 1) //TODO: Multi-threading!
if (DynamicHandlerFactories.Count == 1) //TODO: Multi-threading!
{
Rebus.Subscribe(typeof(AnonymousEventData));
Rebus.Subscribe(typeof(DynamicEventData));
}
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler);
}
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
@ -194,16 +194,16 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
return PublishAsync(eventType, dynamicEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
if (DynamicHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
@ -235,10 +235,10 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName))
else if (DynamicHandlerFactories.ContainsKey(outgoingEvent.EventName))
{
eventData = new AnonymousEventData(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, typeof(object)));
eventType = typeof(AnonymousEventData);
eventData = new DynamicEventData(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, typeof(object)));
eventType = typeof(DynamicEventData);
}
else
{
@ -299,10 +299,10 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName))
{
eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object)));
eventType = typeof(AnonymousEventData);
eventData = new DynamicEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object)));
eventType = typeof(DynamicEventData);
}
else
{
@ -347,7 +347,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
if (typeof(AnonymousEventData) != eventType)
if (typeof(DynamicEventData) != eventType)
{
EventTypes.GetOrAdd(eventName, eventType);
}
@ -377,9 +377,9 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
foreach (var handlerFactory in AnonymousHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value));
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return handlerFactoryList.ToArray();
@ -393,13 +393,13 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
GetOrCreateDynamicHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
@ -413,10 +413,10 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear());
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetDynamicHandlerFactories(string eventName)
{
var result = new List<EventTypeWithEventHandlerFactories>();
@ -426,17 +426,17 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
result.AddRange(GetHandlerFactories(eventType));
}
foreach (var handlerFactory in AnonymousHandlerFactories.Where(hf => hf.Key == eventName))
foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName))
{
result.Add(new EventTypeWithEventHandlerFactories(typeof(AnonymousEventData), handlerFactory.Value));
result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value));
}
return result;
}
private List<IEventHandlerFactory> GetOrCreateAnonymousHandlerFactories(string eventName)
private List<IEventHandlerFactory> GetOrCreateDynamicHandlerFactories(string eventName)
{
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)

16
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs

@ -50,7 +50,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
}
/// <inheritdoc/>
public virtual IDisposable Subscribe(string eventName, IDistributedEventHandler<AnonymousEventData> handler)
public virtual IDisposable Subscribe(string eventName, IDistributedEventHandler<DynamicEventData> handler)
{
return Subscribe(eventName, (IEventHandler)handler);
}
@ -113,14 +113,14 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
bool useOutbox = true)
{
var eventType = GetEventTypeByEventName(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox);
return PublishAsync(eventType, dynamicEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox);
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox);
return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete, useOutbox);
}
public abstract Task PublishFromOutboxAsync(
@ -285,9 +285,9 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
protected virtual string GetEventName(Type eventType, object eventData)
{
if (eventData is AnonymousEventData anonymousEventData)
if (eventData is DynamicEventData dynamicEventData)
{
return anonymousEventData.EventName;
return dynamicEventData.EventName;
}
return EventNameAttribute.GetNameOrDefault(eventType);
@ -295,9 +295,9 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
protected virtual object GetEventData(object eventData)
{
if (eventData is AnonymousEventData anonymousEventData)
if (eventData is DynamicEventData dynamicEventData)
{
return anonymousEventData.Data;
return dynamicEventData.Data;
}
return eventData;

40
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs

@ -24,7 +24,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected ConcurrentDictionary<string, bool> AnonymousEventNames { get; }
protected ConcurrentDictionary<string, bool> DynamicEventNames { get; }
public LocalDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
@ -47,7 +47,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
correlationIdProvider)
{
EventTypes = new ConcurrentDictionary<string, Type>();
AnonymousEventNames = new ConcurrentDictionary<string, bool>();
DynamicEventNames = new ConcurrentDictionary<string, bool>();
Subscribe(abpDistributedEventBusOptions.Value.Handlers);
}
@ -75,7 +75,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
AnonymousEventNames.GetOrAdd(eventName, true);
DynamicEventNames.GetOrAdd(eventName, true);
return LocalEventBus.Subscribe(eventName, handler);
}
@ -173,19 +173,19 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox);
return PublishAsync(eventType, dynamicEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox);
}
if (!AnonymousEventNames.ContainsKey(eventName))
if (!DynamicEventNames.ContainsKey(eventName))
{
throw new AbpException($"Unknown event name: {eventName}");
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox);
return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete, useOutbox);
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
@ -222,19 +222,19 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
if (eventType == null)
{
var isAnonymous = AnonymousEventNames.ContainsKey(outgoingEvent.EventName);
if (!isAnonymous)
var isDynamic = DynamicEventNames.ContainsKey(outgoingEvent.EventName);
if (!isDynamic)
{
return;
}
eventType = typeof(AnonymousEventData);
eventType = typeof(DynamicEventData);
}
object eventData;
if (eventType == typeof(AnonymousEventData))
if (eventType == typeof(DynamicEventData))
{
eventData = new AnonymousEventData(
eventData = new DynamicEventData(
outgoingEvent.EventName,
JsonSerializer.Deserialize<object>(outgoingEvent.EventData)!);
}
@ -264,19 +264,19 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
if (eventType == null)
{
var isAnonymous = AnonymousEventNames.ContainsKey(incomingEvent.EventName);
if (!isAnonymous)
var isDynamic = DynamicEventNames.ContainsKey(incomingEvent.EventName);
if (!isDynamic)
{
return;
}
eventType = typeof(AnonymousEventData);
eventType = typeof(DynamicEventData);
}
object eventData;
if (eventType == typeof(AnonymousEventData))
if (eventType == typeof(DynamicEventData))
{
eventData = new AnonymousEventData(
eventData = new DynamicEventData(
incomingEvent.EventName,
JsonSerializer.Deserialize<object>(incomingEvent.EventData)!);
}
@ -303,7 +303,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
if (eventType != typeof(AnonymousEventData))
if (eventType != typeof(DynamicEventData))
{
EventTypes.GetOrAdd(eventName, eventType);
}
@ -315,9 +315,9 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return LocalEventBus.GetEventHandlerFactories(eventType);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetDynamicHandlerFactories(string eventName)
{
return LocalEventBus.GetAnonymousEventHandlerFactories(eventName);
return LocalEventBus.GetDynamicEventHandlerFactories(eventName);
}
protected override Type? GetEventTypeByEventName(string eventName)

2
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs

@ -51,7 +51,7 @@ public sealed class NullDistributedEventBus : IDistributedEventBus
}
/// <inheritdoc/>
public IDisposable Subscribe(string eventName, IDistributedEventHandler<AnonymousEventData> handler)
public IDisposable Subscribe(string eventName, IDistributedEventHandler<DynamicEventData> handler)
{
return NullDisposable.Instance;
}

18
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs

@ -174,7 +174,7 @@ public abstract class EventBusBase : IEventBus
actualEventType.GetGenericArguments().Length == 1 &&
typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(actualEventType))
{
var resolvedEventData = eventData is AnonymousEventData aed
var resolvedEventData = eventData is DynamicEventData aed
? aed.ConvertToTypedObject(actualEventType)
: eventData;
@ -194,11 +194,11 @@ public abstract class EventBusBase : IEventBus
Type eventType,
object eventData)
{
if (eventData is AnonymousEventData anonymousEventData)
if (eventData is DynamicEventData dynamicEventData)
{
return (
GetAnonymousHandlerFactories(anonymousEventData.EventName).ToList(),
GetEventTypeByEventName(anonymousEventData.EventName)
GetDynamicHandlerFactories(dynamicEventData.EventName).ToList(),
GetEventTypeByEventName(dynamicEventData.EventName)
);
}
@ -207,14 +207,14 @@ public abstract class EventBusBase : IEventBus
protected virtual object ResolveEventDataForHandler(object eventData, Type sourceEventType, Type handlerEventType)
{
if (eventData is AnonymousEventData anonymousEventData && handlerEventType != typeof(AnonymousEventData))
if (eventData is DynamicEventData dynamicEventData && handlerEventType != typeof(DynamicEventData))
{
return anonymousEventData.ConvertToTypedObject(handlerEventType);
return dynamicEventData.ConvertToTypedObject(handlerEventType);
}
if (handlerEventType == typeof(AnonymousEventData) && eventData is not AnonymousEventData)
if (handlerEventType == typeof(DynamicEventData) && eventData is not DynamicEventData)
{
return new AnonymousEventData(EventNameAttribute.GetNameOrDefault(sourceEventType), eventData);
return new DynamicEventData(EventNameAttribute.GetNameOrDefault(sourceEventType), eventData);
}
return eventData;
@ -256,7 +256,7 @@ public abstract class EventBusBase : IEventBus
protected abstract IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType);
protected abstract IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName);
protected abstract IEnumerable<EventTypeWithEventHandlerFactories> GetDynamicHandlerFactories(string eventName);
protected abstract Type? GetEventTypeByEventName(string eventName);

4
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerFactoryUnregistrar.cs

@ -27,13 +27,13 @@ public class EventHandlerFactoryUnregistrar : IDisposable
/// <summary>
/// Used to unregister an <see cref="IEventHandlerFactory"/> for a string-based event name on <see cref="IDisposable.Dispose"/> method.
/// </summary>
public class AnonymousEventHandlerFactoryUnregistrar : IDisposable
public class DynamicEventHandlerFactoryUnregistrar : IDisposable
{
private readonly IEventBus _eventBus;
private readonly string _eventName;
private readonly IEventHandlerFactory _factory;
public AnonymousEventHandlerFactoryUnregistrar(IEventBus eventBus, string eventName, IEventHandlerFactory factory)
public DynamicEventHandlerFactoryUnregistrar(IEventBus eventBus, string eventName, IEventHandlerFactory factory)
{
_eventBus = eventBus;
_eventName = eventName;

42
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs

@ -32,7 +32,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> AnonymousEventHandlerFactories { get; }
protected ConcurrentDictionary<string, List<IEventHandlerFactory>> DynamicEventHandlerFactories { get; }
public LocalEventBus(
IOptions<AbpLocalEventBusOptions> options,
@ -47,7 +47,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
AnonymousEventHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
DynamicEventHandlerFactories = new ConcurrentDictionary<string, List<IEventHandlerFactory>>();
SubscribeHandlers(Options.Handlers);
}
@ -60,7 +60,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
/// <inheritdoc/>
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories =>
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
@ -68,7 +68,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
}
});
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler);
}
/// <inheritdoc/>
@ -140,13 +140,13 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
GetOrCreateDynamicHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
@ -166,7 +166,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear());
}
/// <inheritdoc/>
@ -174,20 +174,20 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
return PublishAsync(eventType, dynamicEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
var isAnonymous = AnonymousEventHandlerFactories.ContainsKey(eventName);
if (!isAnonymous)
var isDynamic = DynamicEventHandlerFactories.ContainsKey(eventName);
if (!isDynamic)
{
throw new AbpException($"Unknown event name: {eventName}");
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete);
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
@ -211,9 +211,9 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
}
/// <inheritdoc/>
public virtual List<EventTypeWithEventHandlerFactories> GetAnonymousEventHandlerFactories(string eventName)
public virtual List<EventTypeWithEventHandlerFactories> GetDynamicEventHandlerFactories(string eventName)
{
return GetAnonymousHandlerFactories(eventName).ToList();
return GetDynamicHandlerFactories(eventName).ToList();
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
@ -232,13 +232,13 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
}
}
foreach (var handlerFactory in AnonymousEventHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
foreach (var handlerFactory in DynamicEventHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key)))
{
foreach (var factory in handlerFactory.Value)
{
handlerFactoryList.Add(new Tuple<IEventHandlerFactory, Type, int>(
factory,
typeof(AnonymousEventData),
typeof(DynamicEventData),
ReflectionHelper.GetAttributesOfMemberOrDeclaringType<LocalEventHandlerOrderAttribute>(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0));
}
}
@ -246,7 +246,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
return handlerFactoryList.OrderBy(x => x.Item3).Select(x => new EventTypeWithEventHandlerFactories(x.Item2, new List<IEventHandlerFactory> {x.Item1})).ToArray();
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetDynamicHandlerFactories(string eventName)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType != null)
@ -256,7 +256,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
var handlerFactoryList = new List<Tuple<IEventHandlerFactory, Type, int>>();
foreach (var handlerFactory in AnonymousEventHandlerFactories.Where(aehf => aehf.Key == eventName))
foreach (var handlerFactory in DynamicEventHandlerFactories.Where(aehf => aehf.Key == eventName))
{
foreach (var factory in handlerFactory.Value)
{
@ -264,7 +264,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
var handlerType = handler.EventHandler.GetType();
handlerFactoryList.Add(new Tuple<IEventHandlerFactory, Type, int>(
factory,
typeof(AnonymousEventData),
typeof(DynamicEventData),
ReflectionHelper
.GetAttributesOfMemberOrDeclaringType<LocalEventHandlerOrderAttribute>(handlerType)
.FirstOrDefault()?.Order ?? 0));
@ -285,9 +285,9 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
return HandlerFactories.GetOrAdd(eventType, (type) => new List<IEventHandlerFactory>());
}
private List<IEventHandlerFactory> GetOrCreateAnonymousHandlerFactories(string eventName)
private List<IEventHandlerFactory> GetOrCreateDynamicHandlerFactories(string eventName)
{
return AnonymousEventHandlerFactories.GetOrAdd(eventName, (name) => new List<IEventHandlerFactory>());
return DynamicEventHandlerFactories.GetOrAdd(eventName, (name) => new List<IEventHandlerFactory>());
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)

2
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs

@ -35,7 +35,7 @@ public sealed class NullLocalEventBus : ILocalEventBus
}
/// <inheritdoc/>
public List<EventTypeWithEventHandlerFactories> GetAnonymousEventHandlerFactories(string eventName)
public List<EventTypeWithEventHandlerFactories> GetDynamicEventHandlerFactories(string eventName)
{
return new List<EventTypeWithEventHandlerFactories>();
}

52
framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs

@ -49,13 +49,13 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
}
[Fact]
public async Task Should_Handle_Anonymous_Handler_When_Published_With_EventName()
public async Task Should_Handle_Dynamic_Handler_When_Published_With_EventName()
{
var handleCount = 0;
var eventName = "MyEvent-" + Guid.NewGuid().ToString("N");
using var subscription = DistributedEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
new SingleInstanceHandlerFactory(new ActionEventHandler<DynamicEventData>(async (d) =>
{
handleCount++;
await Task.CompletedTask;
@ -73,58 +73,58 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
}
[Fact]
public async Task Should_Handle_Anonymous_Handler_When_Published_With_AnonymousEventData()
public async Task Should_Handle_Dynamic_Handler_When_Published_With_DynamicEventData()
{
var handleCount = 0;
var eventName = "MyEvent-" + Guid.NewGuid().ToString("N");
using var subscription = DistributedEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
new SingleInstanceHandlerFactory(new ActionEventHandler<DynamicEventData>(async (d) =>
{
handleCount++;
d.ConvertToTypedObject().ShouldNotBeNull();
await Task.CompletedTask;
})));
await DistributedEventBus.PublishAsync(new AnonymousEventData(eventName, new MySimpleEventData(1)));
await DistributedEventBus.PublishAsync(new AnonymousEventData(eventName, new Dictionary<string, object>()
await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new MySimpleEventData(1)));
await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new Dictionary<string, object>()
{
{"Value", 2}
}));
await DistributedEventBus.PublishAsync(new AnonymousEventData(eventName, new { Value = 3 }));
await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new { Value = 3 }));
Assert.Equal(3, handleCount);
}
[Fact]
public async Task Should_Handle_Typed_Handler_When_Published_With_AnonymousEventData()
public async Task Should_Handle_Typed_Handler_When_Published_With_DynamicEventData()
{
using var subscription = DistributedEventBus.Subscribe<MySimpleEventData, MySimpleDistributedTransientEventHandler>();
var eventName = EventNameAttribute.GetNameOrDefault<MySimpleEventData>();
await DistributedEventBus.PublishAsync(new AnonymousEventData(eventName, new MySimpleEventData(1)));
await DistributedEventBus.PublishAsync(new AnonymousEventData(eventName, new Dictionary<string, object>()
await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new MySimpleEventData(1)));
await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new Dictionary<string, object>()
{
{"Value", 2}
}));
await DistributedEventBus.PublishAsync(new AnonymousEventData(eventName, new { Value = 3 }));
await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new { Value = 3 }));
Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount);
}
[Fact]
public async Task Should_Trigger_Both_Typed_And_Anonymous_Handlers_For_Typed_Event()
public async Task Should_Trigger_Both_Typed_And_Dynamic_Handlers_For_Typed_Event()
{
using var typedSubscription = DistributedEventBus.Subscribe<MySimpleEventData, MySimpleDistributedTransientEventHandler>();
var eventName = EventNameAttribute.GetNameOrDefault<MySimpleEventData>();
var anonymousHandleCount = 0;
var dynamicHandleCount = 0;
using var anonymousSubscription = DistributedEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
using var dynamicSubscription = DistributedEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler<DynamicEventData>(async (d) =>
{
anonymousHandleCount++;
dynamicHandleCount++;
await Task.CompletedTask;
})));
@ -133,42 +133,42 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
await DistributedEventBus.PublishAsync(new MySimpleEventData(3));
Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount);
Assert.Equal(3, anonymousHandleCount);
Assert.Equal(3, dynamicHandleCount);
}
[Fact]
public async Task Should_Trigger_Both_Handlers_For_Mixed_Typed_And_Anonymous_Publish()
public async Task Should_Trigger_Both_Handlers_For_Mixed_Typed_And_Dynamic_Publish()
{
using var typedSubscription = DistributedEventBus.Subscribe<MySimpleEventData, MySimpleDistributedTransientEventHandler>();
var eventName = EventNameAttribute.GetNameOrDefault<MySimpleEventData>();
var anonymousHandleCount = 0;
var dynamicHandleCount = 0;
using var anonymousSubscription = DistributedEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
using var dynamicSubscription = DistributedEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler<DynamicEventData>(async (d) =>
{
anonymousHandleCount++;
dynamicHandleCount++;
await Task.CompletedTask;
})));
await DistributedEventBus.PublishAsync(new MySimpleEventData(1));
await DistributedEventBus.PublishAsync(new AnonymousEventData(eventName, new Dictionary<string, object>()
await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new Dictionary<string, object>()
{
{"Value", 2}
}));
await DistributedEventBus.PublishAsync(new AnonymousEventData(eventName, new { Value = 3 }));
await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new { Value = 3 }));
Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount);
Assert.Equal(3, anonymousHandleCount);
Assert.Equal(3, dynamicHandleCount);
}
[Fact]
public async Task Should_Unsubscribe_Anonymous_Handler()
public async Task Should_Unsubscribe_Dynamic_Handler()
{
var handleCount = 0;
var eventName = "MyEvent-" + Guid.NewGuid().ToString("N");
var handler = new ActionEventHandler<AnonymousEventData>(async (d) =>
var handler = new ActionEventHandler<DynamicEventData>(async (d) =>
{
handleCount++;
await Task.CompletedTask;
@ -194,7 +194,7 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
}
[Fact]
public async Task Should_Convert_AnonymousEventData_To_Typed_Object()
public async Task Should_Convert_DynamicEventData_To_Typed_Object()
{
MySimpleEventData? receivedData = null;

30
framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs → framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Dynamic_Test.cs

@ -6,16 +6,16 @@ using Xunit;
namespace Volo.Abp.EventBus.Local;
public class LocalEventBus_Anonymous_Test : EventBusTestBase
public class LocalEventBus_Dynamic_Test : EventBusTestBase
{
[Fact]
public async Task Should_Handle_Anonymous_Handler_With_EventName()
public async Task Should_Handle_Dynamic_Handler_With_EventName()
{
var handleCount = 0;
var eventName = "TestEvent-" + Guid.NewGuid().ToString("N");
using var subscription = LocalEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
new SingleInstanceHandlerFactory(new ActionEventHandler<DynamicEventData>(async (d) =>
{
handleCount++;
d.EventName.ShouldBe(eventName);
@ -67,10 +67,10 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
}
[Fact]
public async Task Should_Trigger_Both_Typed_And_Anonymous_Handlers()
public async Task Should_Trigger_Both_Typed_And_Dynamic_Handlers()
{
var typedHandleCount = 0;
var anonymousHandleCount = 0;
var dynamicHandleCount = 0;
using var typedSubscription = LocalEventBus.Subscribe<MySimpleEventData>(async (data) =>
{
@ -80,26 +80,26 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
var eventName = EventNameAttribute.GetNameOrDefault<MySimpleEventData>();
using var anonymousSubscription = LocalEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
using var dynamicSubscription = LocalEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<DynamicEventData>(async (d) =>
{
anonymousHandleCount++;
dynamicHandleCount++;
await Task.CompletedTask;
})));
await LocalEventBus.PublishAsync(new MySimpleEventData(1));
typedHandleCount.ShouldBe(1);
anonymousHandleCount.ShouldBe(1);
dynamicHandleCount.ShouldBe(1);
}
[Fact]
public async Task Should_Unsubscribe_Anonymous_Handler()
public async Task Should_Unsubscribe_Dynamic_Handler()
{
var handleCount = 0;
var eventName = "TestEvent-" + Guid.NewGuid().ToString("N");
var handler = new ActionEventHandler<AnonymousEventData>(async (d) =>
var handler = new ActionEventHandler<DynamicEventData>(async (d) =>
{
handleCount++;
await Task.CompletedTask;
@ -125,13 +125,13 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
}
[Fact]
public async Task Should_ConvertToTypedObject_In_Anonymous_Handler()
public async Task Should_ConvertToTypedObject_In_Dynamic_Handler()
{
object? receivedData = null;
var eventName = "TestEvent-" + Guid.NewGuid().ToString("N");
using var subscription = LocalEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
new SingleInstanceHandlerFactory(new ActionEventHandler<DynamicEventData>(async (d) =>
{
receivedData = d.ConvertToTypedObject();
await Task.CompletedTask;
@ -146,13 +146,13 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
}
[Fact]
public async Task Should_ConvertToTypedObject_Generic_In_Anonymous_Handler()
public async Task Should_ConvertToTypedObject_Generic_In_Dynamic_Handler()
{
MySimpleEventData? receivedData = null;
var eventName = "TestEvent-" + Guid.NewGuid().ToString("N");
using var subscription = LocalEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
new SingleInstanceHandlerFactory(new ActionEventHandler<DynamicEventData>(async (d) =>
{
receivedData = d.ConvertToTypedObject<MySimpleEventData>();
await Task.CompletedTask;

24
test/DistEvents/DistDemoApp.Shared/DistEventScenarioProfile.cs

@ -4,19 +4,19 @@ public class DistEventScenarioProfile
{
public string Name { get; set; } = "default";
public string AnonymousOnlyEventName { get; set; } = "dist-demo.anonymous-only";
public string DynamicOnlyEventName { get; set; } = "dist-demo.dynamic-only";
public string AnonymousOnlyMessage { get; set; } = "hello-anonymous";
public string DynamicOnlyMessage { get; set; } = "hello-dynamic";
public int TypedFromTypedValue { get; set; } = 7;
public int TypedFromAnonymousValue { get; set; } = 11;
public int TypedFromDynamicValue { get; set; } = 11;
public bool EnableTypedFromTypedScenario { get; set; } = true;
public bool EnableTypedFromAnonymousScenario { get; set; } = true;
public bool EnableTypedFromDynamicScenario { get; set; } = true;
public bool EnableAnonymousOnlyScenario { get; set; } = true;
public bool EnableDynamicOnlyScenario { get; set; } = true;
public bool OnUnitOfWorkComplete { get; set; } = true;
@ -38,11 +38,11 @@ public class DistEventScenarioProfile
return new DistEventScenarioProfile
{
Name = "dapr-web",
AnonymousOnlyEventName = "dist-demo.dapr.anonymous-only",
AnonymousOnlyMessage = "hello-dapr-web",
DynamicOnlyEventName = "dist-demo.dapr.dynamic-only",
DynamicOnlyMessage = "hello-dapr-web",
EnableTypedFromTypedScenario = false,
EnableTypedFromAnonymousScenario = false,
EnableAnonymousOnlyScenario = false
EnableTypedFromDynamicScenario = false,
EnableDynamicOnlyScenario = false
};
}
@ -51,10 +51,10 @@ public class DistEventScenarioProfile
return new DistEventScenarioProfile
{
Name = "azure-emulator",
AnonymousOnlyEventName = "DistDemoApp.Azure.AnonymousOnly",
AnonymousOnlyMessage = "hello-azure-emulator",
DynamicOnlyEventName = "DistDemoApp.Azure.DynamicOnly",
DynamicOnlyMessage = "hello-azure-emulator",
TypedFromTypedValue = 21,
TypedFromAnonymousValue = 34
TypedFromDynamicValue = 34
};
}
}

44
test/DistEvents/DistDemoApp.Shared/DistEventScenarioRunner.cs

@ -28,10 +28,10 @@ public class DistEventScenarioRunner : IDistEventScenarioRunner, ITransientDepen
var typedFromTypedPublish = profile.EnableTypedFromTypedScenario
? new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously)
: null;
var typedFromAnonymousPublish = profile.EnableTypedFromAnonymousScenario
var typedFromDynamicPublish = profile.EnableTypedFromDynamicScenario
? new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously)
: null;
var anonymousOnlyPublish = profile.EnableAnonymousOnlyScenario
var dynamicOnlyPublish = profile.EnableDynamicOnlyScenario
? new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously)
: null;
@ -42,28 +42,28 @@ public class DistEventScenarioRunner : IDistEventScenarioRunner, ITransientDepen
typedFromTypedPublish.TrySetResult(eventData.Value);
}
if (typedFromAnonymousPublish != null && eventData.Value == profile.TypedFromAnonymousValue)
if (typedFromDynamicPublish != null && eventData.Value == profile.TypedFromDynamicValue)
{
typedFromAnonymousPublish.TrySetResult(eventData.Value);
typedFromDynamicPublish.TrySetResult(eventData.Value);
}
return Task.CompletedTask;
});
IDisposable? anonymousOnlySubscription = null;
if (profile.EnableAnonymousOnlyScenario)
IDisposable? dynamicOnlySubscription = null;
if (profile.EnableDynamicOnlyScenario)
{
anonymousOnlySubscription = _distributedEventBus.Subscribe(
profile.AnonymousOnlyEventName,
dynamicOnlySubscription = _distributedEventBus.Subscribe(
profile.DynamicOnlyEventName,
new SingleInstanceHandlerFactory(
new ActionEventHandler<AnonymousEventData>(eventData =>
new ActionEventHandler<DynamicEventData>(eventData =>
{
var converted = AnonymousEventDataConverter.ConvertToLooseObject(eventData);
var converted = DynamicEventDataConverter.ConvertToLooseObject(eventData);
if (converted is Dictionary<string, object> payload &&
payload.TryGetValue("Message", out var message) &&
message?.ToString() == profile.AnonymousOnlyMessage)
message?.ToString() == profile.DynamicOnlyMessage)
{
anonymousOnlyPublish!.TrySetResult(true);
dynamicOnlyPublish!.TrySetResult(true);
}
return Task.CompletedTask;
@ -88,17 +88,17 @@ public class DistEventScenarioRunner : IDistEventScenarioRunner, ITransientDepen
await typedFromTypedPublish.Task.WaitAsync(TimeSpan.FromSeconds(profile.TimeoutSeconds));
}
if (typedFromAnonymousPublish != null)
if (typedFromDynamicPublish != null)
{
await typedFromAnonymousPublish.Task.WaitAsync(TimeSpan.FromSeconds(profile.TimeoutSeconds));
await typedFromDynamicPublish.Task.WaitAsync(TimeSpan.FromSeconds(profile.TimeoutSeconds));
}
if (anonymousOnlyPublish != null)
if (dynamicOnlyPublish != null)
{
await anonymousOnlyPublish.Task.WaitAsync(TimeSpan.FromSeconds(profile.TimeoutSeconds));
await dynamicOnlyPublish.Task.WaitAsync(TimeSpan.FromSeconds(profile.TimeoutSeconds));
}
anonymousOnlySubscription?.Dispose();
dynamicOnlySubscription?.Dispose();
Console.WriteLine($"All distributed event scenarios passed ({profile.Name}).");
}
@ -113,20 +113,20 @@ public class DistEventScenarioRunner : IDistEventScenarioRunner, ITransientDepen
useOutbox: profile.UseOutbox);
}
if (profile.EnableTypedFromAnonymousScenario)
if (profile.EnableTypedFromDynamicScenario)
{
await _distributedEventBus.PublishAsync(
typedEventName,
new { Value = profile.TypedFromAnonymousValue },
new { Value = profile.TypedFromDynamicValue },
onUnitOfWorkComplete: profile.OnUnitOfWorkComplete,
useOutbox: profile.UseOutbox);
}
if (profile.EnableAnonymousOnlyScenario)
if (profile.EnableDynamicOnlyScenario)
{
await _distributedEventBus.PublishAsync(
profile.AnonymousOnlyEventName,
new { Message = profile.AnonymousOnlyMessage },
profile.DynamicOnlyEventName,
new { Message = profile.DynamicOnlyMessage },
onUnitOfWorkComplete: profile.OnUnitOfWorkComplete,
useOutbox: profile.UseOutbox);
}

Loading…
Cancel
Save