Browse Source

Refactor anonymous event handling and subscriptions

Centralize and simplify anonymous event handling across transports. Introduces AnonymousEventDataConverter and changes AnonymousEventData to store optional JsonData with a FromJson factory. Adds helpers (CreateAnonymousEnvelope, TryPublishTypedByEventNameAsync, TryResolveStoredEventData, TryResolveIncomingEvent) and unifies handling logic in Dapr/Azure/Kafka/RabbitMQ/Rebus implementations. Also deduplicates subscription registration using locking, cleans up empty anonymous handler maps, and removes duplicated JSON conversion code. Tests updated to match the new anonymous event semantics.
pull/25023/head
SALİH ÖZKARA 3 weeks ago
parent
commit
721dbe0f9b
  1. 81
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs
  2. 115
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs
  3. 95
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs
  4. 181
      framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs
  5. 187
      framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs
  6. 179
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
  7. 188
      framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs
  8. 203
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs
  9. 74
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs
  10. 139
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs
  11. 16
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs
  12. 129
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs
  13. 144
      framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs
  14. 36
      framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs

81
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs

@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
@ -96,30 +96,11 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule
if (IsAbpDaprEventData(data))
{
var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As<AbpDaprEventData>();
var eventType = distributedEventBus.GetEventType(daprEventData.Topic);
if (eventType != null)
{
var eventData = daprSerializer.Deserialize(daprEventData.JsonData, eventType);
await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId);
}else if (distributedEventBus.IsAnonymousEvent(daprEventData.Topic))
{
var eventData = daprSerializer.Deserialize(daprEventData.JsonData, typeof(object));
await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(daprEventData.Topic, eventData), daprEventData.MessageId, daprEventData.CorrelationId);
}
await TryHandleAbpDaprEnvelopeAsync(distributedEventBus, daprSerializer, data);
}
else
{
var eventType = distributedEventBus.GetEventType(topic);
if (eventType != null)
{
var eventData = daprSerializer.Deserialize(data, eventType);
await distributedEventBus.TriggerHandlersAsync(eventType, eventData);
}else if (distributedEventBus.IsAnonymousEvent(topic))
{
var eventData = daprSerializer.Deserialize(data, typeof(object));
await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(topic, eventData));
}
await TryHandleDirectDaprEventAsync(distributedEventBus, daprSerializer, topic!, data);
}
httpContext.Response.StatusCode = 200;
@ -136,4 +117,60 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule
objects.Any(x => x.Name.Equals("JsonData", StringComparison.CurrentCultureIgnoreCase)) &&
objects.Any(x => x.Name.Equals("CorrelationId", StringComparison.CurrentCultureIgnoreCase));
}
private static async Task TryHandleAbpDaprEnvelopeAsync(
DaprDistributedEventBus distributedEventBus,
IDaprSerializer daprSerializer,
string data)
{
var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As<AbpDaprEventData>();
if (!TryResolveIncomingEvent(distributedEventBus, daprSerializer, daprEventData.Topic, daprEventData.JsonData, out var eventType, out var eventData))
{
return;
}
await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId);
}
private static async Task TryHandleDirectDaprEventAsync(
DaprDistributedEventBus distributedEventBus,
IDaprSerializer daprSerializer,
string topic,
string data)
{
if (!TryResolveIncomingEvent(distributedEventBus, daprSerializer, topic, data, out var eventType, out var eventData))
{
return;
}
await distributedEventBus.TriggerHandlersAsync(eventType, eventData);
}
private static bool TryResolveIncomingEvent(
DaprDistributedEventBus distributedEventBus,
IDaprSerializer daprSerializer,
string topic,
string data,
out Type eventType,
out object eventData)
{
var typedEventType = distributedEventBus.GetEventType(topic);
if (typedEventType != null)
{
eventType = typedEventType;
eventData = daprSerializer.Deserialize(data, typedEventType);
return true;
}
if (!distributedEventBus.IsAnonymousEvent(topic))
{
eventType = default!;
eventData = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = AnonymousEventData.FromJson(topic, data);
return true;
}
}

115
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs

@ -1,8 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
namespace Volo.Abp.EventBus;
/// <summary>
@ -18,11 +14,14 @@ public class AnonymousEventData
public string EventName { get; }
/// <summary>
/// The raw event data payload. Can be a CLR object, <see cref="JsonElement"/>, or any serializable object.
/// The raw event data payload. Can be a CLR object, <see cref="System.Text.Json.JsonElement"/>, or any serializable object.
/// </summary>
public object Data { get; }
internal object Data { get; }
private JsonElement? _cachedJsonElement;
/// <summary>
/// The raw JSON payload when the event is created from transport data.
/// </summary>
public string? JsonData { get; }
/// <summary>
/// Creates a new instance of <see cref="AnonymousEventData"/>.
@ -36,105 +35,17 @@ public class AnonymousEventData
}
/// <summary>
/// Converts the <see cref="Data"/> to a loosely-typed object graph
/// (dictionaries for objects, lists for arrays, primitives for values).
/// </summary>
/// <returns>A CLR object representation of the event data</returns>
public object ConvertToTypedObject()
{
return ConvertElement(GetJsonElement());
}
/// <summary>
/// Converts the <see cref="Data"/> to a strongly-typed <typeparamref name="T"/> object.
/// Returns the data directly if it is already of type <typeparamref name="T"/>,
/// otherwise deserializes from JSON.
/// </summary>
/// <typeparam name="T">Target type to convert to</typeparam>
/// <returns>The deserialized object of type <typeparamref name="T"/></returns>
/// <exception cref="InvalidOperationException">Thrown when deserialization fails</exception>
public T ConvertToTypedObject<T>()
{
if (Data is T typedData)
{
return typedData;
}
return GetJsonElement().Deserialize<T>()
?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}.");
}
/// <summary>
/// Converts the <see cref="Data"/> to the specified <paramref name="type"/>.
/// Returns the data directly if it is already an instance of the target type,
/// otherwise deserializes from JSON.
/// Creates a new instance of <see cref="AnonymousEventData"/> from raw JSON.
/// </summary>
/// <param name="type">Target type to convert to</param>
/// <returns>The deserialized object</returns>
/// <exception cref="InvalidOperationException">Thrown when deserialization fails</exception>
public object ConvertToTypedObject(Type type)
public static AnonymousEventData FromJson(string eventName, string jsonData)
{
if (type.IsInstanceOfType(Data))
{
return Data;
}
return GetJsonElement().Deserialize(type)
?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}.");
return new AnonymousEventData(eventName, data: null!, jsonData);
}
private JsonElement GetJsonElement()
private AnonymousEventData(string eventName, object data, string? jsonData)
{
if (_cachedJsonElement.HasValue)
{
return _cachedJsonElement.Value;
}
if (Data is JsonElement existingElement)
{
_cachedJsonElement = existingElement;
return existingElement;
}
_cachedJsonElement = JsonSerializer.SerializeToElement(Data);
return _cachedJsonElement.Value;
}
private static object ConvertElement(JsonElement element)
{
switch (element.ValueKind)
{
case JsonValueKind.Object:
{
var obj = new Dictionary<string, object?>();
foreach (var property in element.EnumerateObject())
{
obj[property.Name] = property.Value.ValueKind == JsonValueKind.Null
? null
: ConvertElement(property.Value);
}
return obj;
}
case JsonValueKind.Array:
return element.EnumerateArray()
.Select(item => item.ValueKind == JsonValueKind.Null ? null : (object?)ConvertElement(item))
.ToList();
case JsonValueKind.String:
return element.GetString()!;
case JsonValueKind.Number when element.TryGetInt64(out var longValue):
return longValue;
case JsonValueKind.Number when element.TryGetDecimal(out var decimalValue):
return decimalValue;
case JsonValueKind.Number when element.TryGetDouble(out var doubleValue):
return doubleValue;
case JsonValueKind.True:
return true;
case JsonValueKind.False:
return false;
case JsonValueKind.Null:
case JsonValueKind.Undefined:
default:
return null!;
}
EventName = eventName;
Data = data;
JsonData = jsonData;
}
}

95
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs

@ -0,0 +1,95 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
namespace Volo.Abp.EventBus;
public static class AnonymousEventDataConverter
{
public static T ConvertToTypedObject<T>(AnonymousEventData eventData)
{
if (eventData.Data is T typedData)
{
return typedData;
}
return ParseJsonElement(eventData).Deserialize<T>()
?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}.");
}
public static object ConvertToTypedObject(AnonymousEventData eventData, Type type)
{
if (type.IsInstanceOfType(eventData.Data))
{
return eventData.Data;
}
return ParseJsonElement(eventData).Deserialize(type)
?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}.");
}
public static object ConvertToLooseObject(AnonymousEventData eventData)
{
return ConvertElement(ParseJsonElement(eventData));
}
public static string GetJsonData(AnonymousEventData eventData)
{
return eventData.JsonData ?? ParseJsonElement(eventData).GetRawText();
}
private static JsonElement ParseJsonElement(AnonymousEventData eventData)
{
if (eventData.Data is JsonElement existingElement)
{
return existingElement;
}
if (eventData.JsonData != null)
{
return JsonDocument.Parse(eventData.JsonData).RootElement.Clone();
}
return JsonSerializer.SerializeToElement(eventData.Data);
}
private static object ConvertElement(JsonElement element)
{
switch (element.ValueKind)
{
case JsonValueKind.Object:
{
var obj = new Dictionary<string, object?>();
foreach (var property in element.EnumerateObject())
{
obj[property.Name] = property.Value.ValueKind == JsonValueKind.Null
? null
: ConvertElement(property.Value);
}
return obj;
}
case JsonValueKind.Array:
return element.EnumerateArray()
.Select(item => item.ValueKind == JsonValueKind.Null ? null : (object?)ConvertElement(item))
.ToList();
case JsonValueKind.String:
return element.GetString()!;
case JsonValueKind.Number when element.TryGetInt64(out var longValue):
return longValue;
case JsonValueKind.Number when element.TryGetDecimal(out var decimalValue):
return decimalValue;
case JsonValueKind.Number when element.TryGetDouble(out var doubleValue):
return doubleValue;
case JsonValueKind.True:
return true;
case JsonValueKind.False:
return false;
case JsonValueKind.Null:
case JsonValueKind.Undefined:
default:
return null!;
}
}
}

181
framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs

@ -2,7 +2,6 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.DependencyInjection;
@ -92,10 +91,9 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
eventData = Serializer.Deserialize(message.Body.ToArray(), eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(eventName))
else if (HasAnonymousHandlers(eventName))
{
var data = Serializer.Deserialize<object>(message.Body.ToArray());
eventData = new AnonymousEventData(eventName, data);
eventData = CreateAnonymousEventData(eventName, Serializer.Deserialize<string>(message.Body.ToArray()));
eventType = typeof(AnonymousEventData);
}
else
@ -117,14 +115,21 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
var added = false;
handlerFactories.Locking(factories =>
{
if (!factory.IsInFactories(factories))
{
factories.Add(factory);
added = true;
}
});
if (factory.IsInFactories(handlerFactories))
if (!added)
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
@ -132,14 +137,21 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var added = false;
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
factories.Add(handler);
added = true;
}
});
if (handler.IsInFactories(handlerFactories))
if (!added)
{
return NullDisposable.Instance;
}
handlerFactories.Add(handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
}
@ -199,20 +211,9 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
@ -287,20 +288,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
{
var element = Serializer.Deserialize<object>(incomingEvent.EventData);
eventData = new AnonymousEventData(incomingEvent.EventName, element);
eventType = typeof(AnonymousEventData);
}
else
if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData))
{
return;
}
@ -400,29 +388,45 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories => factories.Remove(factory));
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories => factories.Clear());
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
@ -448,6 +452,81 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
eventData = Serializer.Deserialize(payload, eventType);
return true;
}
if (!HasAnonymousHandlers(eventName))
{
eventData = default!;
eventType = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, payload);
return true;
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType);

187
framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs

@ -70,14 +70,21 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
var added = false;
handlerFactories.Locking(factories =>
{
if (!factory.IsInFactories(factories))
{
factories.Add(factory);
added = true;
}
});
if (factory.IsInFactories(handlerFactories))
if (!added)
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
@ -85,14 +92,21 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var added = false;
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
factories.Add(handler);
added = true;
}
});
if (handler.IsInFactories(handlerFactories))
if (!added)
{
return NullDisposable.Instance;
}
handlerFactories.Add(handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
}
@ -149,20 +163,9 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
@ -178,18 +181,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName))
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, typeof(object));
}
else
if (!TryResolveStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, out var eventType, out var eventData))
{
return;
}
@ -230,19 +222,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
{
eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object)));
eventType = typeof(AnonymousEventData);
}
else
if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData))
{
return;
}
@ -327,33 +307,51 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public bool IsAnonymousEvent(string eventName)
{
return AnonymousHandlerFactories.ContainsKey(eventName);
return HasAnonymousHandlers(eventName);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
@ -379,6 +377,81 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
eventData = Serializer.Deserialize(payload, eventType);
return true;
}
if (!HasAnonymousHandlers(eventName))
{
eventData = default!;
eventType = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, payload);
return true;
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type

179
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs

@ -2,7 +2,6 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
@ -92,10 +91,9 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
eventData = Serializer.Deserialize(message.Value, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(eventName))
else if (HasAnonymousHandlers(eventName))
{
var element = Serializer.Deserialize<object>(message.Value);
eventData = new AnonymousEventData(eventName, element);
eventData = CreateAnonymousEventData(eventName, message.Value);
eventType = typeof(AnonymousEventData);
}
else
@ -117,14 +115,21 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
var added = false;
handlerFactories.Locking(factories =>
{
if (!factory.IsInFactories(factories))
{
factories.Add(factory);
added = true;
}
});
if (factory.IsInFactories(handlerFactories))
if (!added)
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
@ -132,14 +137,21 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var added = false;
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
factories.Add(handler);
added = true;
}
});
if (handler.IsInFactories(handlerFactories))
if (!added)
{
return NullDisposable.Instance;
}
handlerFactories.Add(handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
}
@ -200,20 +212,9 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
@ -325,20 +326,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
IncomingEventInfo incomingEvent,
InboxConfig inboxConfig)
{
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
{
var element = Serializer.Deserialize<object>(incomingEvent.EventData);
eventData = new AnonymousEventData(incomingEvent.EventName, element);
eventType = typeof(AnonymousEventData);
}
else
if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData))
{
return;
}
@ -436,27 +424,45 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
@ -482,6 +488,81 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
eventData = Serializer.Deserialize(payload, eventType);
return true;
}
if (!HasAnonymousHandlers(eventName))
{
eventData = default!;
eventType = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, payload);
return true;
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type

188
framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs

@ -109,10 +109,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
{
eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(eventName))
else if (HasAnonymousHandlers(eventName))
{
eventType = typeof(AnonymousEventData);
eventData = new AnonymousEventData(eventName, Serializer.Deserialize<object>(ea.Body.ToArray()));
eventData = CreateAnonymousEventData(eventName, ea.Body.ToArray());
}
else
{
@ -134,15 +134,24 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
var added = false;
var isFirstHandler = false;
handlerFactories.Locking(factories =>
{
if (!factory.IsInFactories(factories))
{
isFirstHandler = factories.Count == 0;
factories.Add(factory);
added = true;
}
});
if (factory.IsInFactories(handlerFactories))
if (!added)
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
if (handlerFactories.Count == 1) //TODO: Multi-threading!
if (isFirstHandler)
{
Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
}
@ -154,15 +163,24 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
if (handler.IsInFactories(handlerFactories))
var added = false;
var isFirstHandler = false;
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
isFirstHandler = factories.Count == 0;
factories.Add(handler);
added = true;
}
});
if (!added)
{
return NullDisposable.Instance;
}
handlerFactories.Add(handler);
if (handlerFactories.Count == 1) //TODO: Multi-threading!
if (isFirstHandler)
{
Consumer.BindAsync(eventName);
}
@ -227,20 +245,9 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
@ -305,19 +312,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
IncomingEventInfo incomingEvent,
InboxConfig inboxConfig)
{
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
{
eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize<object>(incomingEvent.EventData));
eventType = typeof(AnonymousEventData);
}
else
if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData))
{
return;
}
@ -487,27 +482,45 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
@ -533,6 +546,81 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
eventData = Serializer.Deserialize(payload, eventType);
return true;
}
if (!HasAnonymousHandlers(eventName))
{
eventData = default!;
eventType = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, payload);
return true;
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type

203
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

@ -100,15 +100,24 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
var added = false;
var isFirstHandler = false;
handlerFactories.Locking(factories =>
{
if (!factory.IsInFactories(factories))
{
isFirstHandler = factories.Count == 0;
factories.Add(factory);
added = true;
}
});
if (factory.IsInFactories(handlerFactories))
if (!added)
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
if (handlerFactories.Count == 1) //TODO: Multi-threading!
if (isFirstHandler)
{
Rebus.Subscribe(eventType);
}
@ -120,15 +129,24 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var added = false;
var isFirstHandler = false;
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
isFirstHandler = factories.Count == 0;
factories.Add(handler);
added = true;
}
});
if (handler.IsInFactories(handlerFactories))
if (!added)
{
return NullDisposable.Instance;
}
handlerFactories.Add(handler);
if (AnonymousHandlerFactories.Count == 1) //TODO: Multi-threading!
if (isFirstHandler && AnonymousHandlerFactories.Count == 1)
{
Rebus.Subscribe(typeof(AnonymousEventData));
}
@ -193,20 +211,9 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
@ -228,19 +235,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName))
{
eventData = new AnonymousEventData(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, typeof(object)));
eventType = typeof(AnonymousEventData);
}
else
if (!TryResolveStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, out var eventType, out var eventData))
{
return;
}
@ -273,15 +268,6 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
await PublishFromOutboxAsync(outgoingEvent, outboxConfig);
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
}
await scope.CompleteAsync();
@ -292,19 +278,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
IncomingEventInfo incomingEvent,
InboxConfig inboxConfig)
{
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
{
eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object)));
eventType = typeof(AnonymousEventData);
}
else
if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData))
{
return;
}
@ -393,27 +367,45 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
@ -439,6 +431,81 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
eventData = Serializer.Deserialize(payload, eventType);
return true;
}
if (!HasAnonymousHandlers(eventName))
{
eventData = default!;
eventType = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, payload);
return true;
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type

74
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
@ -112,15 +113,9 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
bool onUnitOfWorkComplete = true,
bool useOutbox = true)
{
var eventType = GetEventTypeByEventName(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox);
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox);
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete, useOutbox)
?? PublishAnonymousByEventNameAsync(anonymousEventData, onUnitOfWorkComplete, useOutbox);
}
public abstract Task PublishFromOutboxAsync(
@ -152,14 +147,14 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
if (outboxConfig.Selector == null || outboxConfig.Selector(eventType))
{
var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
(var eventName, eventData) = ResolveEventForPublishing(eventType, eventData);
var (eventName, resolvedEventData) = ResolveEventForPublishing(eventType, eventData);
await OnAddToOutboxAsync(eventName, eventType, eventData);
await OnAddToOutboxAsync(eventName, eventType, resolvedEventData);
var outgoingEventInfo = new OutgoingEventInfo(
GuidGenerator.Create(),
eventName,
Serialize(eventData),
SerializeEventData(resolvedEventData),
Clock.Now
);
@ -214,13 +209,11 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
}
}
eventData = GetEventData(eventData);
var incomingEventInfo = new IncomingEventInfo(
GuidGenerator.Create(),
messageId!,
eventName,
Serialize(eventData),
SerializeEventData(eventData),
Clock.Now
);
incomingEventInfo.SetCorrelationId(correlationId!);
@ -235,6 +228,55 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
protected abstract byte[] Serialize(object eventData);
protected virtual byte[] SerializeEventData(object eventData)
{
if (eventData is AnonymousEventData anonymousEventData)
{
return Encoding.UTF8.GetBytes(AnonymousEventDataConverter.GetJsonData(anonymousEventData));
}
return Serialize(eventData);
}
protected virtual AnonymousEventData CreateAnonymousEventData(string eventName, byte[] eventData)
{
return AnonymousEventData.FromJson(eventName, Encoding.UTF8.GetString(eventData));
}
protected virtual AnonymousEventData CreateAnonymousEventData(string eventName, string eventData)
{
return AnonymousEventData.FromJson(eventName, eventData);
}
protected virtual AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
protected virtual Task? TryPublishTypedByEventNameAsync(
string eventName,
AnonymousEventData anonymousEventData,
bool onUnitOfWorkComplete,
bool useOutbox)
{
var eventType = GetEventTypeByEventName(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete, useOutbox);
}
protected virtual Task PublishAnonymousByEventNameAsync(
AnonymousEventData anonymousEventData,
bool onUnitOfWorkComplete,
bool useOutbox)
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox);
}
protected virtual async Task TriggerHandlersDirectAsync(Type eventType, object eventData)
{
await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
@ -297,7 +339,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
{
if (eventData is AnonymousEventData anonymousEventData)
{
return anonymousEventData.ConvertToTypedObject();
return AnonymousEventDataConverter.ConvertToLooseObject(anonymousEventData);
}
return eventData;

139
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs

@ -77,7 +77,8 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
AnonymousEventNames.GetOrAdd(eventName, true);
return LocalEventBus.Subscribe(eventName, handler);
LocalEventBus.Subscribe(eventName, handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
}
/// <inheritdoc/>
@ -85,7 +86,8 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
EventTypes.GetOrAdd(eventName, eventType);
return LocalEventBus.Subscribe(eventType, factory);
LocalEventBus.Subscribe(eventType, factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
@ -107,12 +109,14 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
LocalEventBus.Unsubscribe(eventName, factory);
CleanupAnonymousEventName(eventName);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
LocalEventBus.Unsubscribe(eventName, handler);
CleanupAnonymousEventName(eventName);
}
/// <inheritdoc/>
@ -125,6 +129,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override void UnsubscribeAll(string eventName)
{
LocalEventBus.UnsubscribeAll(eventName);
CleanupAnonymousEventName(eventName);
}
/// <inheritdoc/>
@ -173,25 +178,14 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox);
}
if (!AnonymousEventNames.ContainsKey(eventName))
{
throw new AbpException($"Unknown event name: {eventName}");
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox);
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete, useOutbox)
?? PublishAnonymousByEventNameAsync(anonymousEventData, onUnitOfWorkComplete, useOutbox);
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
if (await AddToInboxAsync(Guid.NewGuid().ToString(), EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, null))
if (await AddToInboxAsync(Guid.NewGuid().ToString(), GetEventName(eventType, eventData), eventType, eventData, null))
{
return;
}
@ -220,19 +214,12 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
EventData = outgoingEvent.EventData
});
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
if (eventType == null)
if (!TryResolveStoredEventType(outgoingEvent.EventName, out var eventType))
{
var isAnonymous = AnonymousEventNames.ContainsKey(outgoingEvent.EventName);
if (!isAnonymous)
{
return;
}
eventType = typeof(AnonymousEventData);
return;
}
var eventData = JsonSerializer.Deserialize(Encoding.UTF8.GetString(outgoingEvent.EventData), eventType)!;
var eventData = DeserializeStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, eventType);
if (await AddToInboxAsync(Guid.NewGuid().ToString(), outgoingEvent.EventName, eventType, eventData, null))
{
return;
@ -251,19 +238,12 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
if (eventType == null)
if (!TryResolveStoredEventType(incomingEvent.EventName, out var eventType))
{
var isAnonymous = AnonymousEventNames.ContainsKey(incomingEvent.EventName);
if (!isAnonymous)
{
return;
}
eventType = typeof(AnonymousEventData);
return;
}
var eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType);
var eventData = DeserializeStoredEventData(incomingEvent.EventName, incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId()))
{
@ -303,4 +283,89 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
return EventTypes.GetOrDefault(eventName);
}
protected override AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
protected override Task? TryPublishTypedByEventNameAsync(
string eventName,
AnonymousEventData anonymousEventData,
bool onUnitOfWorkComplete,
bool useOutbox)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete, useOutbox);
}
protected override Task PublishAnonymousByEventNameAsync(
AnonymousEventData anonymousEventData,
bool onUnitOfWorkComplete,
bool useOutbox)
{
if (!HasAnonymousEventName(anonymousEventData.EventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox);
}
protected virtual bool TryResolveStoredEventType(string eventName, out Type eventType)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
return true;
}
if (!HasAnonymousEventName(eventName))
{
return false;
}
eventType = typeof(AnonymousEventData);
return true;
}
protected virtual object DeserializeStoredEventData(string eventName, byte[] eventData, Type eventType)
{
if (eventType == typeof(AnonymousEventData))
{
return CreateAnonymousEventData(eventName, eventData);
}
return JsonSerializer.Deserialize(Encoding.UTF8.GetString(eventData), eventType)!;
}
protected virtual void CleanupAnonymousEventName(string eventName)
{
if (!LocalEventBus.GetAnonymousEventHandlerFactories(eventName).Any())
{
AnonymousEventNames.TryRemove(eventName, out _);
}
}
protected virtual bool HasAnonymousEventName(string eventName)
{
if (!AnonymousEventNames.ContainsKey(eventName))
{
return false;
}
if (!LocalEventBus.GetAnonymousEventHandlerFactories(eventName).Any())
{
AnonymousEventNames.TryRemove(eventName, out _);
return false;
}
return true;
}
}

16
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs

@ -174,9 +174,7 @@ public abstract class EventBusBase : IEventBus
actualEventType.GetGenericArguments().Length == 1 &&
typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(actualEventType))
{
var resolvedEventData = eventData is AnonymousEventData aed
? aed.ConvertToTypedObject(actualEventType)
: eventData;
var resolvedEventData = ResolveActualEventData(eventData, actualEventType);
var genericArg = actualEventType.GetGenericArguments()[0];
var baseArg = genericArg.GetTypeInfo().BaseType;
@ -209,7 +207,7 @@ public abstract class EventBusBase : IEventBus
{
if (eventData is AnonymousEventData anonymousEventData && handlerEventType != typeof(AnonymousEventData))
{
return anonymousEventData.ConvertToTypedObject(handlerEventType);
return AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, handlerEventType);
}
if (handlerEventType == typeof(AnonymousEventData) && eventData is not AnonymousEventData)
@ -220,6 +218,16 @@ public abstract class EventBusBase : IEventBus
return eventData;
}
protected virtual object ResolveActualEventData(object eventData, Type actualEventType)
{
if (eventData is AnonymousEventData anonymousEventData)
{
return AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, actualEventType);
}
return eventData;
}
protected void ThrowOriginalExceptions(Type eventType, List<Exception> exceptions)
{
if (exceptions.Count == 1)

129
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs

@ -60,7 +60,8 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
/// <inheritdoc/>
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories =>
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
@ -140,21 +141,33 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
@ -166,28 +179,21 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
}
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
var isAnonymous = AnonymousEventHandlerFactories.ContainsKey(eventName);
if (!isAnonymous)
{
throw new AbpException($"Unknown event name: {eventName}");
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
@ -225,10 +231,11 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
foreach (var factory in handlerFactory.Value)
{
using var handler = factory.GetHandler();
handlerFactoryList.Add(new Tuple<IEventHandlerFactory, Type, int>(
factory,
handlerFactory.Key,
ReflectionHelper.GetAttributesOfMemberOrDeclaringType<LocalEventHandlerOrderAttribute>(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0));
ReflectionHelper.GetAttributesOfMemberOrDeclaringType<LocalEventHandlerOrderAttribute>(handler.EventHandler.GetType()).FirstOrDefault()?.Order ?? 0));
}
}
@ -236,10 +243,11 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
foreach (var factory in handlerFactory.Value)
{
using var handler = factory.GetHandler();
handlerFactoryList.Add(new Tuple<IEventHandlerFactory, Type, int>(
factory,
typeof(AnonymousEventData),
ReflectionHelper.GetAttributesOfMemberOrDeclaringType<LocalEventHandlerOrderAttribute>(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0));
ReflectionHelper.GetAttributesOfMemberOrDeclaringType<LocalEventHandlerOrderAttribute>(handler.EventHandler.GetType()).FirstOrDefault()?.Order ?? 0));
}
}
@ -290,6 +298,67 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
return AnonymousEventHandlerFactories.GetOrAdd(eventName, (name) => new List<IEventHandlerFactory>());
}
private bool TryGetAnonymousHandlerFactories(string eventName, out List<IEventHandlerFactory> handlerFactories)
{
return AnonymousEventHandlerFactories.TryGetValue(eventName, out handlerFactories!);
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousEventHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousEventHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousEventHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
if (handlerEventType == targetEventType)

144
framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Shouldly;
using Volo.Abp.Domain.Entities.Events.Distributed;
@ -21,20 +22,32 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
[Fact]
public async Task Should_Call_Handler_AndDispose()
{
using var subscription = DistributedEventBus.Subscribe<MySimpleEventData, MySimpleDistributedTransientEventHandler>();
var handleCount = 0;
var disposeCount = 0;
var factory = new CountingDistributedEventHandlerFactory(
() => handleCount++,
() => disposeCount++);
using var subscription = DistributedEventBus.Subscribe(typeof(MySimpleEventData), factory);
await DistributedEventBus.PublishAsync(new MySimpleEventData(1));
await DistributedEventBus.PublishAsync(new MySimpleEventData(2));
await DistributedEventBus.PublishAsync(new MySimpleEventData(3));
Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount);
Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount);
Assert.Equal(3, handleCount);
Assert.Equal(3, disposeCount);
}
[Fact]
public async Task Should_Handle_Typed_Handler_When_Published_With_EventName()
{
using var subscription = DistributedEventBus.Subscribe<MySimpleEventData, MySimpleDistributedTransientEventHandler>();
var handleCount = 0;
var disposeCount = 0;
var factory = new CountingDistributedEventHandlerFactory(
() => handleCount++,
() => disposeCount++);
using var subscription = DistributedEventBus.Subscribe(typeof(MySimpleEventData), factory);
var eventName = EventNameAttribute.GetNameOrDefault<MySimpleEventData>();
await DistributedEventBus.PublishAsync(eventName, new MySimpleEventData(1));
@ -44,8 +57,8 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
});
await DistributedEventBus.PublishAsync(eventName, new { Value = 3 });
Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount);
Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount);
Assert.Equal(3, handleCount);
Assert.Equal(3, disposeCount);
}
[Fact]
@ -82,7 +95,7 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
{
handleCount++;
d.ConvertToTypedObject().ShouldNotBeNull();
AnonymousEventDataConverter.ConvertToLooseObject(d).ShouldNotBeNull();
await Task.CompletedTask;
})));
@ -187,16 +200,15 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
}
[Fact]
public async Task Should_Throw_For_Unknown_Event_Name()
public async Task Should_Ignore_Unknown_Event_Name()
{
await Assert.ThrowsAsync<AbpException>(() =>
DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }));
await DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 });
}
[Fact]
public async Task Should_Convert_AnonymousEventData_To_Typed_Object()
{
MySimpleEventData? receivedData = null;
MySimpleEventData receivedData = null!;
using var subscription = DistributedEventBus.Subscribe<MySimpleEventData>(async (data) =>
{
@ -211,6 +223,63 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
receivedData.Value.ShouldBe(42);
}
[Fact]
public async Task Should_Rehydrate_Anonymous_Event_From_Outbox_Using_Raw_Json()
{
var localDistributedEventBus = GetRequiredService<LocalDistributedEventBus>();
AnonymousEventData receivedData = null!;
var eventName = "MyEvent-" + Guid.NewGuid().ToString("N");
using var subscription = localDistributedEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async data =>
{
receivedData = data;
await Task.CompletedTask;
})));
var outgoingEvent = new OutgoingEventInfo(
Guid.NewGuid(),
eventName,
Encoding.UTF8.GetBytes("{\"Value\":42}"),
DateTime.UtcNow);
await localDistributedEventBus.PublishFromOutboxAsync(outgoingEvent, new OutboxConfig("Test") { DatabaseName = "Test" });
receivedData.ShouldNotBeNull();
receivedData.EventName.ShouldBe(eventName);
AnonymousEventDataConverter.GetJsonData(receivedData).ShouldBe("{\"Value\":42}");
AnonymousEventDataConverter.ConvertToTypedObject<MySimpleEventData>(receivedData).Value.ShouldBe(42);
}
[Fact]
public async Task Should_Rehydrate_Anonymous_Event_From_Inbox_Using_Raw_Json()
{
var localDistributedEventBus = GetRequiredService<LocalDistributedEventBus>();
AnonymousEventData receivedData = null!;
var eventName = "MyEvent-" + Guid.NewGuid().ToString("N");
using var subscription = localDistributedEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async data =>
{
receivedData = data;
await Task.CompletedTask;
})));
var incomingEvent = new IncomingEventInfo(
Guid.NewGuid(),
Guid.NewGuid().ToString("N"),
eventName,
Encoding.UTF8.GetBytes("\"hello\""),
DateTime.UtcNow);
await localDistributedEventBus.ProcessFromInboxAsync(incomingEvent, new InboxConfig("Test") { DatabaseName = "Test" });
receivedData.ShouldNotBeNull();
receivedData.EventName.ShouldBe(eventName);
AnonymousEventDataConverter.GetJsonData(receivedData).ShouldBe("\"hello\"");
AnonymousEventDataConverter.ConvertToTypedObject<string>(receivedData).ShouldBe("hello");
}
[Fact]
public async Task Should_Change_TenantId_If_EventData_Is_MultiTenant()
{
@ -309,4 +378,57 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
}
}
private sealed class CountingDistributedEventHandlerFactory : IEventHandlerFactory
{
private readonly Action _handleAction;
private readonly Action _disposeAction;
public CountingDistributedEventHandlerFactory(Action handleAction, Action disposeAction)
{
_handleAction = handleAction;
_disposeAction = disposeAction;
}
public IEventHandlerDisposeWrapper GetHandler()
{
var wasHandled = false;
return new EventHandlerDisposeWrapper(
new CountingDistributedEventHandler(
_handleAction,
() => wasHandled = true),
() =>
{
if (wasHandled)
{
_disposeAction();
}
}
);
}
public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
{
return handlerFactories.Contains(this);
}
}
private sealed class CountingDistributedEventHandler : IDistributedEventHandler<MySimpleEventData>
{
private readonly Action _handleAction;
private readonly Action _markHandled;
public CountingDistributedEventHandler(Action handleAction, Action markHandled)
{
_handleAction = handleAction;
_markHandled = markHandled;
}
public Task HandleEventAsync(MySimpleEventData eventData)
{
_markHandled();
_handleAction();
return Task.CompletedTask;
}
}
}

36
framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs

@ -48,7 +48,7 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
[Fact]
public async Task Should_Convert_Dictionary_To_Typed_Handler()
{
MySimpleEventData? receivedData = null;
MySimpleEventData receivedData = null!;
using var subscription = LocalEventBus.Subscribe<MySimpleEventData>(async (data) =>
{
@ -118,29 +118,28 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
}
[Fact]
public async Task Should_Throw_For_Unknown_Event_Name()
public async Task Should_Ignore_Unknown_Event_Name()
{
await Assert.ThrowsAsync<AbpException>(() =>
LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }));
await LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 });
}
[Fact]
public async Task Should_ConvertToTypedObject_In_Anonymous_Handler()
{
object? receivedData = null;
object receivedData = null!;
var eventName = "TestEvent-" + Guid.NewGuid().ToString("N");
using var subscription = LocalEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
{
receivedData = d.ConvertToTypedObject();
receivedData = AnonymousEventDataConverter.ConvertToLooseObject(d);
await Task.CompletedTask;
})));
await LocalEventBus.PublishAsync(eventName, new { Name = "Hello", Count = 42 });
receivedData.ShouldNotBeNull();
var dict = receivedData.ShouldBeOfType<Dictionary<string, object?>>();
var dict = receivedData.ShouldBeOfType<Dictionary<string, object>>();
dict["Name"].ShouldBe("Hello");
dict["Count"].ShouldBe(42L);
}
@ -148,13 +147,13 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
[Fact]
public async Task Should_ConvertToTypedObject_Generic_In_Anonymous_Handler()
{
MySimpleEventData? receivedData = null;
MySimpleEventData receivedData = null!;
var eventName = "TestEvent-" + Guid.NewGuid().ToString("N");
using var subscription = LocalEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
{
receivedData = d.ConvertToTypedObject<MySimpleEventData>();
receivedData = AnonymousEventDataConverter.ConvertToTypedObject<MySimpleEventData>(d);
await Task.CompletedTask;
})));
@ -163,4 +162,23 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
receivedData.ShouldNotBeNull();
receivedData.Value.ShouldBe(99);
}
[Fact]
public void Should_Roundtrip_Raw_Json_Without_Object_Deserialization()
{
var eventData = AnonymousEventData.FromJson("TestEvent", "{\"Value\":42,\"Name\":\"hello\"}");
eventData.EventName.ShouldBe("TestEvent");
AnonymousEventDataConverter.GetJsonData(eventData).ShouldBe("{\"Value\":42,\"Name\":\"hello\"}");
AnonymousEventDataConverter.ConvertToTypedObject<MySimpleEventData>(eventData).Value.ShouldBe(42);
}
[Fact]
public void Should_Preserve_String_Payload_As_Raw_Json()
{
var eventData = AnonymousEventData.FromJson("TestEvent", "\"hello\"");
AnonymousEventDataConverter.GetJsonData(eventData).ShouldBe("\"hello\"");
AnonymousEventDataConverter.ConvertToTypedObject<string>(eventData).ShouldBe("hello");
}
}

Loading…
Cancel
Save