Browse Source

Unify anonymous event handling across buses

Add native anonymous event support and simplify handling across transports. AnonymousEventData now contains conversion helpers (ConvertToTypedObject/ConvertToTypedObject<T>/ConvertToTypedObject -> loose typed object), caching JSON elements and replacing the removed AnonymousEventDataConverter. Multiple distributed event bus implementations (Azure, Dapr, Kafka, RabbitMQ, Rebus) were updated to: detect anonymous handlers via AnonymousHandlerFactories, construct AnonymousEventData when appropriate, resolve event types at publish/process time, simplify Subscribe/Unsubscribe logic (avoid duplicate-factory checks using IsInFactories then add), and throw on unknown event names in PublishAsync. AbpAspNetCoreMvcDaprEventBusModule was refactored to deserialize and trigger handlers inline for both envelope and direct Dapr events. Tests updated accordingly and a small cursor hook state file was added.
pull/25023/head
SALİH ÖZKARA 3 months ago
parent
commit
fb47069209
  1. 81
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs
  2. 115
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs
  3. 95
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs
  4. 181
      framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs
  5. 187
      framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs
  6. 179
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
  7. 188
      framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs
  8. 203
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs
  9. 74
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs
  10. 164
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs
  11. 16
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs
  12. 153
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs
  13. 144
      framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs
  14. 36
      framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs

81
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs

@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
@ -96,11 +96,30 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule
if (IsAbpDaprEventData(data))
{
await TryHandleAbpDaprEnvelopeAsync(distributedEventBus, daprSerializer, data);
var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As<AbpDaprEventData>();
var eventType = distributedEventBus.GetEventType(daprEventData.Topic);
if (eventType != null)
{
var eventData = daprSerializer.Deserialize(daprEventData.JsonData, eventType);
await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId);
}else if (distributedEventBus.IsAnonymousEvent(daprEventData.Topic))
{
var eventData = daprSerializer.Deserialize(daprEventData.JsonData, typeof(object));
await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(daprEventData.Topic, eventData), daprEventData.MessageId, daprEventData.CorrelationId);
}
}
else
{
await TryHandleDirectDaprEventAsync(distributedEventBus, daprSerializer, topic!, data);
var eventType = distributedEventBus.GetEventType(topic);
if (eventType != null)
{
var eventData = daprSerializer.Deserialize(data, eventType);
await distributedEventBus.TriggerHandlersAsync(eventType, eventData);
}else if (distributedEventBus.IsAnonymousEvent(topic))
{
var eventData = daprSerializer.Deserialize(data, typeof(object));
await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(topic, eventData));
}
}
httpContext.Response.StatusCode = 200;
@ -117,60 +136,4 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule
objects.Any(x => x.Name.Equals("JsonData", StringComparison.CurrentCultureIgnoreCase)) &&
objects.Any(x => x.Name.Equals("CorrelationId", StringComparison.CurrentCultureIgnoreCase));
}
private static async Task TryHandleAbpDaprEnvelopeAsync(
DaprDistributedEventBus distributedEventBus,
IDaprSerializer daprSerializer,
string data)
{
var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As<AbpDaprEventData>();
if (!TryResolveIncomingEvent(distributedEventBus, daprSerializer, daprEventData.Topic, daprEventData.JsonData, out var eventType, out var eventData))
{
return;
}
await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId);
}
private static async Task TryHandleDirectDaprEventAsync(
DaprDistributedEventBus distributedEventBus,
IDaprSerializer daprSerializer,
string topic,
string data)
{
if (!TryResolveIncomingEvent(distributedEventBus, daprSerializer, topic, data, out var eventType, out var eventData))
{
return;
}
await distributedEventBus.TriggerHandlersAsync(eventType, eventData);
}
private static bool TryResolveIncomingEvent(
DaprDistributedEventBus distributedEventBus,
IDaprSerializer daprSerializer,
string topic,
string data,
out Type eventType,
out object eventData)
{
var typedEventType = distributedEventBus.GetEventType(topic);
if (typedEventType != null)
{
eventType = typedEventType;
eventData = daprSerializer.Deserialize(data, typedEventType);
return true;
}
if (!distributedEventBus.IsAnonymousEvent(topic))
{
eventType = default!;
eventData = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = AnonymousEventData.FromJson(topic, data);
return true;
}
}

115
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs

@ -1,4 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
namespace Volo.Abp.EventBus;
/// <summary>
@ -14,14 +18,11 @@ public class AnonymousEventData
public string EventName { get; }
/// <summary>
/// The raw event data payload. Can be a CLR object, <see cref="System.Text.Json.JsonElement"/>, or any serializable object.
/// The raw event data payload. Can be a CLR object, <see cref="JsonElement"/>, or any serializable object.
/// </summary>
internal object Data { get; }
public object Data { get; }
/// <summary>
/// The raw JSON payload when the event is created from transport data.
/// </summary>
public string? JsonData { get; }
private JsonElement? _cachedJsonElement;
/// <summary>
/// Creates a new instance of <see cref="AnonymousEventData"/>.
@ -35,17 +36,105 @@ public class AnonymousEventData
}
/// <summary>
/// Creates a new instance of <see cref="AnonymousEventData"/> from raw JSON.
/// Converts the <see cref="Data"/> to a loosely-typed object graph
/// (dictionaries for objects, lists for arrays, primitives for values).
/// </summary>
public static AnonymousEventData FromJson(string eventName, string jsonData)
/// <returns>A CLR object representation of the event data</returns>
public object ConvertToTypedObject()
{
return new AnonymousEventData(eventName, data: null!, jsonData);
return ConvertElement(GetJsonElement());
}
private AnonymousEventData(string eventName, object data, string? jsonData)
/// <summary>
/// Converts the <see cref="Data"/> to a strongly-typed <typeparamref name="T"/> object.
/// Returns the data directly if it is already of type <typeparamref name="T"/>,
/// otherwise deserializes from JSON.
/// </summary>
/// <typeparam name="T">Target type to convert to</typeparam>
/// <returns>The deserialized object of type <typeparamref name="T"/></returns>
/// <exception cref="InvalidOperationException">Thrown when deserialization fails</exception>
public T ConvertToTypedObject<T>()
{
EventName = eventName;
Data = data;
JsonData = jsonData;
if (Data is T typedData)
{
return typedData;
}
return GetJsonElement().Deserialize<T>()
?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}.");
}
/// <summary>
/// Converts the <see cref="Data"/> to the specified <paramref name="type"/>.
/// Returns the data directly if it is already an instance of the target type,
/// otherwise deserializes from JSON.
/// </summary>
/// <param name="type">Target type to convert to</param>
/// <returns>The deserialized object</returns>
/// <exception cref="InvalidOperationException">Thrown when deserialization fails</exception>
public object ConvertToTypedObject(Type type)
{
if (type.IsInstanceOfType(Data))
{
return Data;
}
return GetJsonElement().Deserialize(type)
?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}.");
}
private JsonElement GetJsonElement()
{
if (_cachedJsonElement.HasValue)
{
return _cachedJsonElement.Value;
}
if (Data is JsonElement existingElement)
{
_cachedJsonElement = existingElement;
return existingElement;
}
_cachedJsonElement = JsonSerializer.SerializeToElement(Data);
return _cachedJsonElement.Value;
}
private static object ConvertElement(JsonElement element)
{
switch (element.ValueKind)
{
case JsonValueKind.Object:
{
var obj = new Dictionary<string, object?>();
foreach (var property in element.EnumerateObject())
{
obj[property.Name] = property.Value.ValueKind == JsonValueKind.Null
? null
: ConvertElement(property.Value);
}
return obj;
}
case JsonValueKind.Array:
return element.EnumerateArray()
.Select(item => item.ValueKind == JsonValueKind.Null ? null : (object?)ConvertElement(item))
.ToList();
case JsonValueKind.String:
return element.GetString()!;
case JsonValueKind.Number when element.TryGetInt64(out var longValue):
return longValue;
case JsonValueKind.Number when element.TryGetDecimal(out var decimalValue):
return decimalValue;
case JsonValueKind.Number when element.TryGetDouble(out var doubleValue):
return doubleValue;
case JsonValueKind.True:
return true;
case JsonValueKind.False:
return false;
case JsonValueKind.Null:
case JsonValueKind.Undefined:
default:
return null!;
}
}
}

95
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs

@ -1,95 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
namespace Volo.Abp.EventBus;
public static class AnonymousEventDataConverter
{
public static T ConvertToTypedObject<T>(AnonymousEventData eventData)
{
if (eventData.Data is T typedData)
{
return typedData;
}
return ParseJsonElement(eventData).Deserialize<T>()
?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}.");
}
public static object ConvertToTypedObject(AnonymousEventData eventData, Type type)
{
if (type.IsInstanceOfType(eventData.Data))
{
return eventData.Data;
}
return ParseJsonElement(eventData).Deserialize(type)
?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}.");
}
public static object ConvertToLooseObject(AnonymousEventData eventData)
{
return ConvertElement(ParseJsonElement(eventData));
}
public static string GetJsonData(AnonymousEventData eventData)
{
return eventData.JsonData ?? ParseJsonElement(eventData).GetRawText();
}
private static JsonElement ParseJsonElement(AnonymousEventData eventData)
{
if (eventData.Data is JsonElement existingElement)
{
return existingElement;
}
if (eventData.JsonData != null)
{
return JsonDocument.Parse(eventData.JsonData).RootElement.Clone();
}
return JsonSerializer.SerializeToElement(eventData.Data);
}
private static object ConvertElement(JsonElement element)
{
switch (element.ValueKind)
{
case JsonValueKind.Object:
{
var obj = new Dictionary<string, object?>();
foreach (var property in element.EnumerateObject())
{
obj[property.Name] = property.Value.ValueKind == JsonValueKind.Null
? null
: ConvertElement(property.Value);
}
return obj;
}
case JsonValueKind.Array:
return element.EnumerateArray()
.Select(item => item.ValueKind == JsonValueKind.Null ? null : (object?)ConvertElement(item))
.ToList();
case JsonValueKind.String:
return element.GetString()!;
case JsonValueKind.Number when element.TryGetInt64(out var longValue):
return longValue;
case JsonValueKind.Number when element.TryGetDecimal(out var decimalValue):
return decimalValue;
case JsonValueKind.Number when element.TryGetDouble(out var doubleValue):
return doubleValue;
case JsonValueKind.True:
return true;
case JsonValueKind.False:
return false;
case JsonValueKind.Null:
case JsonValueKind.Undefined:
default:
return null!;
}
}
}

181
framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs

@ -2,6 +2,7 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.DependencyInjection;
@ -91,9 +92,10 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
eventData = Serializer.Deserialize(message.Body.ToArray(), eventType);
}
else if (HasAnonymousHandlers(eventName))
else if (AnonymousHandlerFactories.ContainsKey(eventName))
{
eventData = CreateAnonymousEventData(eventName, Serializer.Deserialize<string>(message.Body.ToArray()));
var data = Serializer.Deserialize<object>(message.Body.ToArray());
eventData = new AnonymousEventData(eventName, data);
eventType = typeof(AnonymousEventData);
}
else
@ -115,21 +117,14 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
var added = false;
handlerFactories.Locking(factories =>
{
if (!factory.IsInFactories(factories))
{
factories.Add(factory);
added = true;
}
});
if (!added)
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
@ -137,21 +132,14 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var added = false;
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
factories.Add(handler);
added = true;
}
});
if (!added)
if (handler.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
}
@ -211,9 +199,20 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
@ -288,7 +287,20 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData))
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
{
var element = Serializer.Deserialize<object>(incomingEvent.EventData);
eventData = new AnonymousEventData(incomingEvent.EventName, element);
eventType = typeof(AnonymousEventData);
}
else
{
return;
}
@ -388,45 +400,29 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
}
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories => factories.Clear());
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
@ -452,81 +448,6 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
eventData = Serializer.Deserialize(payload, eventType);
return true;
}
if (!HasAnonymousHandlers(eventName))
{
eventData = default!;
eventType = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, payload);
return true;
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType);

187
framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs

@ -70,21 +70,14 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
var added = false;
handlerFactories.Locking(factories =>
{
if (!factory.IsInFactories(factories))
{
factories.Add(factory);
added = true;
}
});
if (!added)
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
@ -92,21 +85,14 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var added = false;
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
factories.Add(handler);
added = true;
}
});
if (!added)
if (handler.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
}
@ -163,9 +149,20 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
@ -181,7 +178,18 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
if (!TryResolveStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, out var eventType, out var eventData))
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName))
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, typeof(object));
}
else
{
return;
}
@ -222,7 +230,19 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData))
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
{
eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object)));
eventType = typeof(AnonymousEventData);
}
else
{
return;
}
@ -307,51 +327,33 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public bool IsAnonymousEvent(string eventName)
{
return HasAnonymousHandlers(eventName);
return AnonymousHandlerFactories.ContainsKey(eventName);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
}
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
@ -377,81 +379,6 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
eventData = Serializer.Deserialize(payload, eventType);
return true;
}
if (!HasAnonymousHandlers(eventName))
{
eventData = default!;
eventType = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, payload);
return true;
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type

179
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs

@ -2,6 +2,7 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
@ -91,9 +92,10 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
eventData = Serializer.Deserialize(message.Value, eventType);
}
else if (HasAnonymousHandlers(eventName))
else if (AnonymousHandlerFactories.ContainsKey(eventName))
{
eventData = CreateAnonymousEventData(eventName, message.Value);
var element = Serializer.Deserialize<object>(message.Value);
eventData = new AnonymousEventData(eventName, element);
eventType = typeof(AnonymousEventData);
}
else
@ -115,21 +117,14 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
var added = false;
handlerFactories.Locking(factories =>
{
if (!factory.IsInFactories(factories))
{
factories.Add(factory);
added = true;
}
});
if (!added)
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
@ -137,21 +132,14 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var added = false;
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
factories.Add(handler);
added = true;
}
});
if (!added)
if (handler.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
}
@ -212,9 +200,20 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
@ -326,7 +325,20 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
IncomingEventInfo incomingEvent,
InboxConfig inboxConfig)
{
if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData))
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
{
var element = Serializer.Deserialize<object>(incomingEvent.EventData);
eventData = new AnonymousEventData(incomingEvent.EventName, element);
eventType = typeof(AnonymousEventData);
}
else
{
return;
}
@ -424,45 +436,27 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
}
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
@ -488,81 +482,6 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
eventData = Serializer.Deserialize(payload, eventType);
return true;
}
if (!HasAnonymousHandlers(eventName))
{
eventData = default!;
eventType = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, payload);
return true;
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type

188
framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs

@ -109,10 +109,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
{
eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType);
}
else if (HasAnonymousHandlers(eventName))
else if (AnonymousHandlerFactories.ContainsKey(eventName))
{
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, ea.Body.ToArray());
eventData = new AnonymousEventData(eventName, Serializer.Deserialize<object>(ea.Body.ToArray()));
}
else
{
@ -134,24 +134,15 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
var added = false;
var isFirstHandler = false;
handlerFactories.Locking(factories =>
{
if (!factory.IsInFactories(factories))
{
isFirstHandler = factories.Count == 0;
factories.Add(factory);
added = true;
}
});
if (!added)
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
if (isFirstHandler)
handlerFactories.Add(factory);
if (handlerFactories.Count == 1) //TODO: Multi-threading!
{
Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
}
@ -163,24 +154,15 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var added = false;
var isFirstHandler = false;
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
isFirstHandler = factories.Count == 0;
factories.Add(handler);
added = true;
}
});
if (!added)
if (handler.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
if (isFirstHandler)
handlerFactories.Add(handler);
if (handlerFactories.Count == 1) //TODO: Multi-threading!
{
Consumer.BindAsync(eventName);
}
@ -245,9 +227,20 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
@ -312,7 +305,19 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
IncomingEventInfo incomingEvent,
InboxConfig inboxConfig)
{
if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData))
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
{
eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize<object>(incomingEvent.EventData));
eventType = typeof(AnonymousEventData);
}
else
{
return;
}
@ -482,45 +487,27 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
}
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
@ -546,81 +533,6 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
eventData = Serializer.Deserialize(payload, eventType);
return true;
}
if (!HasAnonymousHandlers(eventName))
{
eventData = default!;
eventType = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, payload);
return true;
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type

203
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

@ -100,24 +100,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
var added = false;
var isFirstHandler = false;
handlerFactories.Locking(factories =>
{
if (!factory.IsInFactories(factories))
{
isFirstHandler = factories.Count == 0;
factories.Add(factory);
added = true;
}
});
if (!added)
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
if (isFirstHandler)
handlerFactories.Add(factory);
if (handlerFactories.Count == 1) //TODO: Multi-threading!
{
Rebus.Subscribe(eventType);
}
@ -129,24 +120,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
var added = false;
var isFirstHandler = false;
handlerFactories.Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
isFirstHandler = factories.Count == 0;
factories.Add(handler);
added = true;
}
});
if (!added)
if (handler.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
if (isFirstHandler && AnonymousHandlerFactories.Count == 1)
handlerFactories.Add(handler);
if (AnonymousHandlerFactories.Count == 1) //TODO: Multi-threading!
{
Rebus.Subscribe(typeof(AnonymousEventData));
}
@ -211,9 +193,20 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
if (AnonymousHandlerFactories.ContainsKey(eventName))
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
throw new AbpException($"Unknown event name: {eventName}");
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
@ -235,7 +228,19 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
if (!TryResolveStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, out var eventType, out var eventData))
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName))
{
eventData = new AnonymousEventData(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, typeof(object)));
eventType = typeof(AnonymousEventData);
}
else
{
return;
}
@ -268,6 +273,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
await PublishFromOutboxAsync(outgoingEvent, outboxConfig);
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
}
await scope.CompleteAsync();
@ -278,7 +292,19 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
IncomingEventInfo incomingEvent,
InboxConfig inboxConfig)
{
if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData))
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
object eventData;
if (eventType != null)
{
eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
}
else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName))
{
eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object)));
eventType = typeof(AnonymousEventData);
}
else
{
return;
}
@ -367,45 +393,27 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
}
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetAnonymousHandlerFactories(string eventName)
@ -431,81 +439,6 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List<IEventHandlerFactory>());
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
eventData = Serializer.Deserialize(payload, eventType);
return true;
}
if (!HasAnonymousHandlers(eventName))
{
eventData = default!;
eventType = default!;
return false;
}
eventType = typeof(AnonymousEventData);
eventData = CreateAnonymousEventData(eventName, payload);
return true;
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type

74
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs

@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
@ -113,9 +112,15 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
bool onUnitOfWorkComplete = true,
bool useOutbox = true)
{
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete, useOutbox)
?? PublishAnonymousByEventNameAsync(anonymousEventData, onUnitOfWorkComplete, useOutbox);
var eventType = GetEventTypeByEventName(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox);
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox);
}
public abstract Task PublishFromOutboxAsync(
@ -147,14 +152,14 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
if (outboxConfig.Selector == null || outboxConfig.Selector(eventType))
{
var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
var (eventName, resolvedEventData) = ResolveEventForPublishing(eventType, eventData);
(var eventName, eventData) = ResolveEventForPublishing(eventType, eventData);
await OnAddToOutboxAsync(eventName, eventType, resolvedEventData);
await OnAddToOutboxAsync(eventName, eventType, eventData);
var outgoingEventInfo = new OutgoingEventInfo(
GuidGenerator.Create(),
eventName,
SerializeEventData(resolvedEventData),
Serialize(eventData),
Clock.Now
);
@ -209,11 +214,13 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
}
}
eventData = GetEventData(eventData);
var incomingEventInfo = new IncomingEventInfo(
GuidGenerator.Create(),
messageId!,
eventName,
SerializeEventData(eventData),
Serialize(eventData),
Clock.Now
);
incomingEventInfo.SetCorrelationId(correlationId!);
@ -228,55 +235,6 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
protected abstract byte[] Serialize(object eventData);
protected virtual byte[] SerializeEventData(object eventData)
{
if (eventData is AnonymousEventData anonymousEventData)
{
return Encoding.UTF8.GetBytes(AnonymousEventDataConverter.GetJsonData(anonymousEventData));
}
return Serialize(eventData);
}
protected virtual AnonymousEventData CreateAnonymousEventData(string eventName, byte[] eventData)
{
return AnonymousEventData.FromJson(eventName, Encoding.UTF8.GetString(eventData));
}
protected virtual AnonymousEventData CreateAnonymousEventData(string eventName, string eventData)
{
return AnonymousEventData.FromJson(eventName, eventData);
}
protected virtual AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
protected virtual Task? TryPublishTypedByEventNameAsync(
string eventName,
AnonymousEventData anonymousEventData,
bool onUnitOfWorkComplete,
bool useOutbox)
{
var eventType = GetEventTypeByEventName(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete, useOutbox);
}
protected virtual Task PublishAnonymousByEventNameAsync(
AnonymousEventData anonymousEventData,
bool onUnitOfWorkComplete,
bool useOutbox)
{
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox);
}
protected virtual async Task TriggerHandlersDirectAsync(Type eventType, object eventData)
{
await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
@ -339,7 +297,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
{
if (eventData is AnonymousEventData anonymousEventData)
{
return AnonymousEventDataConverter.ConvertToLooseObject(anonymousEventData);
return anonymousEventData.Data;
}
return eventData;

164
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs

@ -3,7 +3,6 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
@ -77,8 +76,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
AnonymousEventNames.GetOrAdd(eventName, true);
LocalEventBus.Subscribe(eventName, handler);
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
return LocalEventBus.Subscribe(eventName, handler);
}
/// <inheritdoc/>
@ -86,8 +84,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
EventTypes.GetOrAdd(eventName, eventType);
LocalEventBus.Subscribe(eventType, factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
return LocalEventBus.Subscribe(eventType, factory);
}
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
@ -109,14 +106,12 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
LocalEventBus.Unsubscribe(eventName, factory);
CleanupAnonymousEventName(eventName);
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
LocalEventBus.Unsubscribe(eventName, handler);
CleanupAnonymousEventName(eventName);
}
/// <inheritdoc/>
@ -129,7 +124,6 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public override void UnsubscribeAll(string eventName)
{
LocalEventBus.UnsubscribeAll(eventName);
CleanupAnonymousEventName(eventName);
}
/// <inheritdoc/>
@ -178,14 +172,25 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete, useOutbox)
?? PublishAnonymousByEventNameAsync(anonymousEventData, onUnitOfWorkComplete, useOutbox);
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox);
}
if (!AnonymousEventNames.ContainsKey(eventName))
{
throw new AbpException($"Unknown event name: {eventName}");
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox);
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
if (await AddToInboxAsync(Guid.NewGuid().ToString(), GetEventName(eventType, eventData), eventType, eventData, null))
if (await AddToInboxAsync(Guid.NewGuid().ToString(), EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, null))
{
return;
}
@ -214,12 +219,30 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
EventData = outgoingEvent.EventData
});
if (!TryResolveStoredEventType(outgoingEvent.EventName, out var eventType))
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
if (eventType == null)
{
return;
var isAnonymous = AnonymousEventNames.ContainsKey(outgoingEvent.EventName);
if (!isAnonymous)
{
return;
}
eventType = typeof(AnonymousEventData);
}
object eventData;
if (eventType == typeof(AnonymousEventData))
{
eventData = new AnonymousEventData(
outgoingEvent.EventName,
JsonSerializer.Deserialize<object>(outgoingEvent.EventData)!);
}
else
{
eventData = JsonSerializer.Deserialize(outgoingEvent.EventData, eventType)!;
}
var eventData = DeserializeStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, eventType);
if (await AddToInboxAsync(Guid.NewGuid().ToString(), outgoingEvent.EventName, eventType, eventData, null))
{
return;
@ -238,12 +261,30 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
if (!TryResolveStoredEventType(incomingEvent.EventName, out var eventType))
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
if (eventType == null)
{
return;
var isAnonymous = AnonymousEventNames.ContainsKey(incomingEvent.EventName);
if (!isAnonymous)
{
return;
}
eventType = typeof(AnonymousEventData);
}
object eventData;
if (eventType == typeof(AnonymousEventData))
{
eventData = new AnonymousEventData(
incomingEvent.EventName,
JsonSerializer.Deserialize<object>(incomingEvent.EventData)!);
}
else
{
eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType)!;
}
var eventData = DeserializeStoredEventData(incomingEvent.EventName, incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId()))
{
@ -257,7 +298,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected override byte[] Serialize(object eventData)
{
return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(eventData));
return JsonSerializer.SerializeToUtf8Bytes(eventData);
}
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
@ -283,89 +324,4 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
return EventTypes.GetOrDefault(eventName);
}
protected override AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
protected override Task? TryPublishTypedByEventNameAsync(
string eventName,
AnonymousEventData anonymousEventData,
bool onUnitOfWorkComplete,
bool useOutbox)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete, useOutbox);
}
protected override Task PublishAnonymousByEventNameAsync(
AnonymousEventData anonymousEventData,
bool onUnitOfWorkComplete,
bool useOutbox)
{
if (!HasAnonymousEventName(anonymousEventData.EventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox);
}
protected virtual bool TryResolveStoredEventType(string eventName, out Type eventType)
{
eventType = EventTypes.GetOrDefault(eventName)!;
if (eventType != null)
{
return true;
}
if (!HasAnonymousEventName(eventName))
{
return false;
}
eventType = typeof(AnonymousEventData);
return true;
}
protected virtual object DeserializeStoredEventData(string eventName, byte[] eventData, Type eventType)
{
if (eventType == typeof(AnonymousEventData))
{
return CreateAnonymousEventData(eventName, eventData);
}
return JsonSerializer.Deserialize(Encoding.UTF8.GetString(eventData), eventType)!;
}
protected virtual void CleanupAnonymousEventName(string eventName)
{
if (!LocalEventBus.GetAnonymousEventHandlerFactories(eventName).Any())
{
AnonymousEventNames.TryRemove(eventName, out _);
}
}
protected virtual bool HasAnonymousEventName(string eventName)
{
if (!AnonymousEventNames.ContainsKey(eventName))
{
return false;
}
if (!LocalEventBus.GetAnonymousEventHandlerFactories(eventName).Any())
{
AnonymousEventNames.TryRemove(eventName, out _);
return false;
}
return true;
}
}

16
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs

@ -174,7 +174,9 @@ public abstract class EventBusBase : IEventBus
actualEventType.GetGenericArguments().Length == 1 &&
typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(actualEventType))
{
var resolvedEventData = ResolveActualEventData(eventData, actualEventType);
var resolvedEventData = eventData is AnonymousEventData aed
? aed.ConvertToTypedObject(actualEventType)
: eventData;
var genericArg = actualEventType.GetGenericArguments()[0];
var baseArg = genericArg.GetTypeInfo().BaseType;
@ -207,7 +209,7 @@ public abstract class EventBusBase : IEventBus
{
if (eventData is AnonymousEventData anonymousEventData && handlerEventType != typeof(AnonymousEventData))
{
return AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, handlerEventType);
return anonymousEventData.ConvertToTypedObject(handlerEventType);
}
if (handlerEventType == typeof(AnonymousEventData) && eventData is not AnonymousEventData)
@ -218,16 +220,6 @@ public abstract class EventBusBase : IEventBus
return eventData;
}
protected virtual object ResolveActualEventData(object eventData, Type actualEventType)
{
if (eventData is AnonymousEventData anonymousEventData)
{
return AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, actualEventType);
}
return eventData;
}
protected void ThrowOriginalExceptions(Type eventType, List<Exception> exceptions)
{
if (exceptions.Count == 1)

153
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs

@ -60,8 +60,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
/// <inheritdoc/>
public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler)
{
var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName);
handlerFactories.Locking(factories =>
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories =>
{
if (!handler.IsInFactories(factories))
{
@ -141,33 +140,21 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandlerFactory factory)
{
if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Remove(factory));
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void Unsubscribe(string eventName, IEventHandler handler)
{
if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory singleFactory &&
singleFactory.HandlerInstance == handler
);
});
}
/// <inheritdoc/>
@ -179,21 +166,28 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
/// <inheritdoc/>
public override void UnsubscribeAll(string eventName)
{
if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories))
{
return;
}
handlerFactories.Locking(factories => factories.Clear());
CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories);
GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear());
}
/// <inheritdoc/>
public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true)
{
var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData);
return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete)
?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete);
var eventType = EventTypes.GetOrDefault(eventName);
var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
if (eventType != null)
{
return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete);
}
var isAnonymous = AnonymousEventHandlerFactories.ContainsKey(eventName);
if (!isAnonymous)
{
throw new AbpException($"Unknown event name: {eventName}");
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
@ -231,11 +225,10 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
foreach (var factory in handlerFactory.Value)
{
var handlerType = GetHandlerType(factory);
handlerFactoryList.Add(new Tuple<IEventHandlerFactory, Type, int>(
factory,
handlerFactory.Key,
ReflectionHelper.GetAttributesOfMemberOrDeclaringType<LocalEventHandlerOrderAttribute>(handlerType).FirstOrDefault()?.Order ?? 0));
ReflectionHelper.GetAttributesOfMemberOrDeclaringType<LocalEventHandlerOrderAttribute>(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0));
}
}
@ -243,11 +236,10 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
foreach (var factory in handlerFactory.Value)
{
var handlerType = GetHandlerType(factory);
handlerFactoryList.Add(new Tuple<IEventHandlerFactory, Type, int>(
factory,
typeof(AnonymousEventData),
ReflectionHelper.GetAttributesOfMemberOrDeclaringType<LocalEventHandlerOrderAttribute>(handlerType).FirstOrDefault()?.Order ?? 0));
ReflectionHelper.GetAttributesOfMemberOrDeclaringType<LocalEventHandlerOrderAttribute>(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0));
}
}
@ -268,7 +260,8 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
foreach (var factory in handlerFactory.Value)
{
var handlerType = GetHandlerType(factory);
using var handler = factory.GetHandler();
var handlerType = handler.EventHandler.GetType();
handlerFactoryList.Add(new Tuple<IEventHandlerFactory, Type, int>(
factory,
typeof(AnonymousEventData),
@ -297,88 +290,6 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
return AnonymousEventHandlerFactories.GetOrAdd(eventName, (name) => new List<IEventHandlerFactory>());
}
private bool TryGetAnonymousHandlerFactories(string eventName, out List<IEventHandlerFactory> handlerFactories)
{
return AnonymousEventHandlerFactories.TryGetValue(eventName, out handlerFactories!);
}
private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData)
{
return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData);
}
private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return null;
}
var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType);
return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete);
}
private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete)
{
if (!HasAnonymousHandlers(eventName))
{
return Task.CompletedTask;
}
return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete);
}
private static Type GetHandlerType(IEventHandlerFactory factory)
{
if (factory is SingleInstanceHandlerFactory singleInstanceHandlerFactory)
{
return singleInstanceHandlerFactory.HandlerInstance.GetType();
}
if (factory is IocEventHandlerFactory iocEventHandlerFactory)
{
return iocEventHandlerFactory.HandlerType;
}
if (factory is TransientEventHandlerFactory transientEventHandlerFactory)
{
return transientEventHandlerFactory.HandlerType;
}
using var handler = factory.GetHandler();
return handler.EventHandler.GetType();
}
private bool HasAnonymousHandlers(string eventName)
{
if (!AnonymousEventHandlerFactories.TryGetValue(eventName, out var handlerFactories))
{
return false;
}
var hasHandlers = false;
handlerFactories.Locking(factories => hasHandlers = factories.Count > 0);
if (!hasHandlers)
{
AnonymousEventHandlerFactories.TryRemove(eventName, out _);
}
return hasHandlers;
}
private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List<IEventHandlerFactory> handlerFactories)
{
var isEmpty = false;
handlerFactories.Locking(factories => isEmpty = factories.Count == 0);
if (isEmpty)
{
AnonymousEventHandlerFactories.TryRemove(eventName, out _);
}
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
if (handlerEventType == targetEventType)

144
framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs

@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Shouldly;
using Volo.Abp.Domain.Entities.Events.Distributed;
@ -22,32 +21,20 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
[Fact]
public async Task Should_Call_Handler_AndDispose()
{
var handleCount = 0;
var disposeCount = 0;
var factory = new CountingDistributedEventHandlerFactory(
() => handleCount++,
() => disposeCount++);
using var subscription = DistributedEventBus.Subscribe(typeof(MySimpleEventData), factory);
using var subscription = DistributedEventBus.Subscribe<MySimpleEventData, MySimpleDistributedTransientEventHandler>();
await DistributedEventBus.PublishAsync(new MySimpleEventData(1));
await DistributedEventBus.PublishAsync(new MySimpleEventData(2));
await DistributedEventBus.PublishAsync(new MySimpleEventData(3));
Assert.Equal(3, handleCount);
Assert.Equal(3, disposeCount);
Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount);
Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount);
}
[Fact]
public async Task Should_Handle_Typed_Handler_When_Published_With_EventName()
{
var handleCount = 0;
var disposeCount = 0;
var factory = new CountingDistributedEventHandlerFactory(
() => handleCount++,
() => disposeCount++);
using var subscription = DistributedEventBus.Subscribe(typeof(MySimpleEventData), factory);
using var subscription = DistributedEventBus.Subscribe<MySimpleEventData, MySimpleDistributedTransientEventHandler>();
var eventName = EventNameAttribute.GetNameOrDefault<MySimpleEventData>();
await DistributedEventBus.PublishAsync(eventName, new MySimpleEventData(1));
@ -57,8 +44,8 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
});
await DistributedEventBus.PublishAsync(eventName, new { Value = 3 });
Assert.Equal(3, handleCount);
Assert.Equal(3, disposeCount);
Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount);
Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount);
}
[Fact]
@ -95,7 +82,7 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
{
handleCount++;
AnonymousEventDataConverter.ConvertToLooseObject(d).ShouldNotBeNull();
d.ConvertToTypedObject().ShouldNotBeNull();
await Task.CompletedTask;
})));
@ -200,15 +187,16 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
}
[Fact]
public async Task Should_Ignore_Unknown_Event_Name()
public async Task Should_Throw_For_Unknown_Event_Name()
{
await DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 });
await Assert.ThrowsAsync<AbpException>(() =>
DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }));
}
[Fact]
public async Task Should_Convert_AnonymousEventData_To_Typed_Object()
{
MySimpleEventData receivedData = null!;
MySimpleEventData? receivedData = null;
using var subscription = DistributedEventBus.Subscribe<MySimpleEventData>(async (data) =>
{
@ -223,63 +211,6 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
receivedData.Value.ShouldBe(42);
}
[Fact]
public async Task Should_Rehydrate_Anonymous_Event_From_Outbox_Using_Raw_Json()
{
var localDistributedEventBus = GetRequiredService<LocalDistributedEventBus>();
AnonymousEventData receivedData = null!;
var eventName = "MyEvent-" + Guid.NewGuid().ToString("N");
using var subscription = localDistributedEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async data =>
{
receivedData = data;
await Task.CompletedTask;
})));
var outgoingEvent = new OutgoingEventInfo(
Guid.NewGuid(),
eventName,
Encoding.UTF8.GetBytes("{\"Value\":42}"),
DateTime.UtcNow);
await localDistributedEventBus.PublishFromOutboxAsync(outgoingEvent, new OutboxConfig("Test") { DatabaseName = "Test" });
receivedData.ShouldNotBeNull();
receivedData.EventName.ShouldBe(eventName);
AnonymousEventDataConverter.GetJsonData(receivedData).ShouldBe("{\"Value\":42}");
AnonymousEventDataConverter.ConvertToTypedObject<MySimpleEventData>(receivedData).Value.ShouldBe(42);
}
[Fact]
public async Task Should_Rehydrate_Anonymous_Event_From_Inbox_Using_Raw_Json()
{
var localDistributedEventBus = GetRequiredService<LocalDistributedEventBus>();
AnonymousEventData receivedData = null!;
var eventName = "MyEvent-" + Guid.NewGuid().ToString("N");
using var subscription = localDistributedEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async data =>
{
receivedData = data;
await Task.CompletedTask;
})));
var incomingEvent = new IncomingEventInfo(
Guid.NewGuid(),
Guid.NewGuid().ToString("N"),
eventName,
Encoding.UTF8.GetBytes("\"hello\""),
DateTime.UtcNow);
await localDistributedEventBus.ProcessFromInboxAsync(incomingEvent, new InboxConfig("Test") { DatabaseName = "Test" });
receivedData.ShouldNotBeNull();
receivedData.EventName.ShouldBe(eventName);
AnonymousEventDataConverter.GetJsonData(receivedData).ShouldBe("\"hello\"");
AnonymousEventDataConverter.ConvertToTypedObject<string>(receivedData).ShouldBe("hello");
}
[Fact]
public async Task Should_Change_TenantId_If_EventData_Is_MultiTenant()
{
@ -378,57 +309,4 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase
}
}
private sealed class CountingDistributedEventHandlerFactory : IEventHandlerFactory
{
private readonly Action _handleAction;
private readonly Action _disposeAction;
public CountingDistributedEventHandlerFactory(Action handleAction, Action disposeAction)
{
_handleAction = handleAction;
_disposeAction = disposeAction;
}
public IEventHandlerDisposeWrapper GetHandler()
{
var wasHandled = false;
return new EventHandlerDisposeWrapper(
new CountingDistributedEventHandler(
_handleAction,
() => wasHandled = true),
() =>
{
if (wasHandled)
{
_disposeAction();
}
}
);
}
public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
{
return handlerFactories.Contains(this);
}
}
private sealed class CountingDistributedEventHandler : IDistributedEventHandler<MySimpleEventData>
{
private readonly Action _handleAction;
private readonly Action _markHandled;
public CountingDistributedEventHandler(Action handleAction, Action markHandled)
{
_handleAction = handleAction;
_markHandled = markHandled;
}
public Task HandleEventAsync(MySimpleEventData eventData)
{
_markHandled();
_handleAction();
return Task.CompletedTask;
}
}
}

36
framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs

@ -48,7 +48,7 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
[Fact]
public async Task Should_Convert_Dictionary_To_Typed_Handler()
{
MySimpleEventData receivedData = null!;
MySimpleEventData? receivedData = null;
using var subscription = LocalEventBus.Subscribe<MySimpleEventData>(async (data) =>
{
@ -118,28 +118,29 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
}
[Fact]
public async Task Should_Ignore_Unknown_Event_Name()
public async Task Should_Throw_For_Unknown_Event_Name()
{
await LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 });
await Assert.ThrowsAsync<AbpException>(() =>
LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }));
}
[Fact]
public async Task Should_ConvertToTypedObject_In_Anonymous_Handler()
{
object receivedData = null!;
object? receivedData = null;
var eventName = "TestEvent-" + Guid.NewGuid().ToString("N");
using var subscription = LocalEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
{
receivedData = AnonymousEventDataConverter.ConvertToLooseObject(d);
receivedData = d.ConvertToTypedObject();
await Task.CompletedTask;
})));
await LocalEventBus.PublishAsync(eventName, new { Name = "Hello", Count = 42 });
receivedData.ShouldNotBeNull();
var dict = receivedData.ShouldBeOfType<Dictionary<string, object>>();
var dict = receivedData.ShouldBeOfType<Dictionary<string, object?>>();
dict["Name"].ShouldBe("Hello");
dict["Count"].ShouldBe(42L);
}
@ -147,13 +148,13 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
[Fact]
public async Task Should_ConvertToTypedObject_Generic_In_Anonymous_Handler()
{
MySimpleEventData receivedData = null!;
MySimpleEventData? receivedData = null;
var eventName = "TestEvent-" + Guid.NewGuid().ToString("N");
using var subscription = LocalEventBus.Subscribe(eventName,
new SingleInstanceHandlerFactory(new ActionEventHandler<AnonymousEventData>(async (d) =>
{
receivedData = AnonymousEventDataConverter.ConvertToTypedObject<MySimpleEventData>(d);
receivedData = d.ConvertToTypedObject<MySimpleEventData>();
await Task.CompletedTask;
})));
@ -162,23 +163,4 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase
receivedData.ShouldNotBeNull();
receivedData.Value.ShouldBe(99);
}
[Fact]
public void Should_Roundtrip_Raw_Json_Without_Object_Deserialization()
{
var eventData = AnonymousEventData.FromJson("TestEvent", "{\"Value\":42,\"Name\":\"hello\"}");
eventData.EventName.ShouldBe("TestEvent");
AnonymousEventDataConverter.GetJsonData(eventData).ShouldBe("{\"Value\":42,\"Name\":\"hello\"}");
AnonymousEventDataConverter.ConvertToTypedObject<MySimpleEventData>(eventData).Value.ShouldBe(42);
}
[Fact]
public void Should_Preserve_String_Payload_As_Raw_Json()
{
var eventData = AnonymousEventData.FromJson("TestEvent", "\"hello\"");
AnonymousEventDataConverter.GetJsonData(eventData).ShouldBe("\"hello\"");
AnonymousEventDataConverter.ConvertToTypedObject<string>(eventData).ShouldBe("hello");
}
}

Loading…
Cancel
Save