diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs index f770f5fbb2..079ed1287a 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Text.Json; @@ -96,30 +96,11 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule if (IsAbpDaprEventData(data)) { - var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As(); - var eventType = distributedEventBus.GetEventType(daprEventData.Topic); - if (eventType != null) - { - var eventData = daprSerializer.Deserialize(daprEventData.JsonData, eventType); - await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId); - }else if (distributedEventBus.IsAnonymousEvent(daprEventData.Topic)) - { - var eventData = daprSerializer.Deserialize(daprEventData.JsonData, typeof(object)); - await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(daprEventData.Topic, eventData), daprEventData.MessageId, daprEventData.CorrelationId); - } + await TryHandleAbpDaprEnvelopeAsync(distributedEventBus, daprSerializer, data); } else { - var eventType = distributedEventBus.GetEventType(topic); - if (eventType != null) - { - var eventData = daprSerializer.Deserialize(data, eventType); - await distributedEventBus.TriggerHandlersAsync(eventType, eventData); - }else if (distributedEventBus.IsAnonymousEvent(topic)) - { - var eventData = daprSerializer.Deserialize(data, typeof(object)); - await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(topic, eventData)); - } + await TryHandleDirectDaprEventAsync(distributedEventBus, daprSerializer, topic!, data); } httpContext.Response.StatusCode = 200; @@ -136,4 +117,60 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule objects.Any(x => x.Name.Equals("JsonData", StringComparison.CurrentCultureIgnoreCase)) && objects.Any(x => x.Name.Equals("CorrelationId", StringComparison.CurrentCultureIgnoreCase)); } + + private static async Task TryHandleAbpDaprEnvelopeAsync( + DaprDistributedEventBus distributedEventBus, + IDaprSerializer daprSerializer, + string data) + { + var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As(); + if (!TryResolveIncomingEvent(distributedEventBus, daprSerializer, daprEventData.Topic, daprEventData.JsonData, out var eventType, out var eventData)) + { + return; + } + + await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId); + } + + private static async Task TryHandleDirectDaprEventAsync( + DaprDistributedEventBus distributedEventBus, + IDaprSerializer daprSerializer, + string topic, + string data) + { + if (!TryResolveIncomingEvent(distributedEventBus, daprSerializer, topic, data, out var eventType, out var eventData)) + { + return; + } + + await distributedEventBus.TriggerHandlersAsync(eventType, eventData); + } + + private static bool TryResolveIncomingEvent( + DaprDistributedEventBus distributedEventBus, + IDaprSerializer daprSerializer, + string topic, + string data, + out Type eventType, + out object eventData) + { + var typedEventType = distributedEventBus.GetEventType(topic); + if (typedEventType != null) + { + eventType = typedEventType; + eventData = daprSerializer.Deserialize(data, typedEventType); + return true; + } + + if (!distributedEventBus.IsAnonymousEvent(topic)) + { + eventType = default!; + eventData = default!; + return false; + } + + eventType = typeof(AnonymousEventData); + eventData = AnonymousEventData.FromJson(topic, data); + return true; + } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs index 1db318b40e..255b4cffff 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs @@ -1,8 +1,4 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text.Json; - namespace Volo.Abp.EventBus; /// @@ -18,11 +14,14 @@ public class AnonymousEventData public string EventName { get; } /// - /// The raw event data payload. Can be a CLR object, , or any serializable object. + /// The raw event data payload. Can be a CLR object, , or any serializable object. /// - public object Data { get; } + internal object Data { get; } - private JsonElement? _cachedJsonElement; + /// + /// The raw JSON payload when the event is created from transport data. + /// + public string? JsonData { get; } /// /// Creates a new instance of . @@ -36,105 +35,17 @@ public class AnonymousEventData } /// - /// Converts the to a loosely-typed object graph - /// (dictionaries for objects, lists for arrays, primitives for values). - /// - /// A CLR object representation of the event data - public object ConvertToTypedObject() - { - return ConvertElement(GetJsonElement()); - } - - /// - /// Converts the to a strongly-typed object. - /// Returns the data directly if it is already of type , - /// otherwise deserializes from JSON. - /// - /// Target type to convert to - /// The deserialized object of type - /// Thrown when deserialization fails - public T ConvertToTypedObject() - { - if (Data is T typedData) - { - return typedData; - } - - return GetJsonElement().Deserialize() - ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}."); - } - - /// - /// Converts the to the specified . - /// Returns the data directly if it is already an instance of the target type, - /// otherwise deserializes from JSON. + /// Creates a new instance of from raw JSON. /// - /// Target type to convert to - /// The deserialized object - /// Thrown when deserialization fails - public object ConvertToTypedObject(Type type) + public static AnonymousEventData FromJson(string eventName, string jsonData) { - if (type.IsInstanceOfType(Data)) - { - return Data; - } - - return GetJsonElement().Deserialize(type) - ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}."); + return new AnonymousEventData(eventName, data: null!, jsonData); } - private JsonElement GetJsonElement() + private AnonymousEventData(string eventName, object data, string? jsonData) { - if (_cachedJsonElement.HasValue) - { - return _cachedJsonElement.Value; - } - - if (Data is JsonElement existingElement) - { - _cachedJsonElement = existingElement; - return existingElement; - } - - _cachedJsonElement = JsonSerializer.SerializeToElement(Data); - return _cachedJsonElement.Value; - } - - private static object ConvertElement(JsonElement element) - { - switch (element.ValueKind) - { - case JsonValueKind.Object: - { - var obj = new Dictionary(); - foreach (var property in element.EnumerateObject()) - { - obj[property.Name] = property.Value.ValueKind == JsonValueKind.Null - ? null - : ConvertElement(property.Value); - } - return obj; - } - case JsonValueKind.Array: - return element.EnumerateArray() - .Select(item => item.ValueKind == JsonValueKind.Null ? null : (object?)ConvertElement(item)) - .ToList(); - case JsonValueKind.String: - return element.GetString()!; - case JsonValueKind.Number when element.TryGetInt64(out var longValue): - return longValue; - case JsonValueKind.Number when element.TryGetDecimal(out var decimalValue): - return decimalValue; - case JsonValueKind.Number when element.TryGetDouble(out var doubleValue): - return doubleValue; - case JsonValueKind.True: - return true; - case JsonValueKind.False: - return false; - case JsonValueKind.Null: - case JsonValueKind.Undefined: - default: - return null!; - } + EventName = eventName; + Data = data; + JsonData = jsonData; } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs new file mode 100644 index 0000000000..e4b9234ccd --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; + +namespace Volo.Abp.EventBus; + +public static class AnonymousEventDataConverter +{ + public static T ConvertToTypedObject(AnonymousEventData eventData) + { + if (eventData.Data is T typedData) + { + return typedData; + } + + return ParseJsonElement(eventData).Deserialize() + ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}."); + } + + public static object ConvertToTypedObject(AnonymousEventData eventData, Type type) + { + if (type.IsInstanceOfType(eventData.Data)) + { + return eventData.Data; + } + + return ParseJsonElement(eventData).Deserialize(type) + ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}."); + } + + public static object ConvertToLooseObject(AnonymousEventData eventData) + { + return ConvertElement(ParseJsonElement(eventData)); + } + + public static string GetJsonData(AnonymousEventData eventData) + { + return eventData.JsonData ?? ParseJsonElement(eventData).GetRawText(); + } + + private static JsonElement ParseJsonElement(AnonymousEventData eventData) + { + if (eventData.Data is JsonElement existingElement) + { + return existingElement; + } + + if (eventData.JsonData != null) + { + return JsonDocument.Parse(eventData.JsonData).RootElement.Clone(); + } + + return JsonSerializer.SerializeToElement(eventData.Data); + } + + private static object ConvertElement(JsonElement element) + { + switch (element.ValueKind) + { + case JsonValueKind.Object: + { + var obj = new Dictionary(); + foreach (var property in element.EnumerateObject()) + { + obj[property.Name] = property.Value.ValueKind == JsonValueKind.Null + ? null + : ConvertElement(property.Value); + } + + return obj; + } + case JsonValueKind.Array: + return element.EnumerateArray() + .Select(item => item.ValueKind == JsonValueKind.Null ? null : (object?)ConvertElement(item)) + .ToList(); + case JsonValueKind.String: + return element.GetString()!; + case JsonValueKind.Number when element.TryGetInt64(out var longValue): + return longValue; + case JsonValueKind.Number when element.TryGetDecimal(out var decimalValue): + return decimalValue; + case JsonValueKind.Number when element.TryGetDouble(out var doubleValue): + return doubleValue; + case JsonValueKind.True: + return true; + case JsonValueKind.False: + return false; + case JsonValueKind.Null: + case JsonValueKind.Undefined: + default: + return null!; + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index 020757ad07..83eba73fa3 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -2,7 +2,6 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Text.Json; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using Microsoft.Extensions.DependencyInjection; @@ -92,10 +91,9 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen { eventData = Serializer.Deserialize(message.Body.ToArray(), eventType); } - else if (AnonymousHandlerFactories.ContainsKey(eventName)) + else if (HasAnonymousHandlers(eventName)) { - var data = Serializer.Deserialize(message.Body.ToArray()); - eventData = new AnonymousEventData(eventName, data); + eventData = CreateAnonymousEventData(eventName, Serializer.Deserialize(message.Body.ToArray())); eventType = typeof(AnonymousEventData); } else @@ -117,14 +115,21 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); + var added = false; + handlerFactories.Locking(factories => + { + if (!factory.IsInFactories(factories)) + { + factories.Add(factory); + added = true; + } + }); - if (factory.IsInFactories(handlerFactories)) + if (!added) { return NullDisposable.Instance; } - handlerFactories.Add(factory); - return new EventHandlerFactoryUnregistrar(this, eventType, factory); } @@ -132,14 +137,21 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); + var added = false; + handlerFactories.Locking(factories => + { + if (!handler.IsInFactories(factories)) + { + factories.Add(handler); + added = true; + } + }); - if (handler.IsInFactories(handlerFactories)) + if (!added) { return NullDisposable.Instance; } - handlerFactories.Add(handler); - return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } @@ -199,20 +211,9 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var eventType = EventTypes.GetOrDefault(eventName); - var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - - if (eventType != null) - { - return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); - } - - if (AnonymousHandlerFactories.ContainsKey(eventName)) - { - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - throw new AbpException($"Unknown event name: {eventName}"); + var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); + return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) + ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -287,20 +288,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - object eventData; - - if (eventType != null) - { - eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); - } - else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) - { - var element = Serializer.Deserialize(incomingEvent.EventData); - eventData = new AnonymousEventData(incomingEvent.EventName, element); - eventType = typeof(AnonymousEventData); - } - else + if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) { return; } @@ -400,29 +388,45 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - GetOrCreateAnonymousHandlerFactories(eventName) - .Locking(factories => factories.Remove(factory)); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Remove(factory)); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - GetOrCreateAnonymousHandlerFactories(eventName) - .Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void UnsubscribeAll(string eventName) { - GetOrCreateAnonymousHandlerFactories(eventName) - .Locking(factories => factories.Clear()); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Clear()); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -448,6 +452,81 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } + private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) + { + return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + } + + private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType == null) + { + return null; + } + + var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); + return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); + } + + private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + if (!HasAnonymousHandlers(eventName)) + { + return Task.CompletedTask; + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) + { + eventType = EventTypes.GetOrDefault(eventName)!; + if (eventType != null) + { + eventData = Serializer.Deserialize(payload, eventType); + return true; + } + + if (!HasAnonymousHandlers(eventName)) + { + eventData = default!; + eventType = default!; + return false; + } + + eventType = typeof(AnonymousEventData); + eventData = CreateAnonymousEventData(eventName, payload); + return true; + } + + private bool HasAnonymousHandlers(string eventName) + { + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return false; + } + + var hasHandlers = false; + handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); + if (!hasHandlers) + { + AnonymousHandlerFactories.TryRemove(eventName, out _); + } + + return hasHandlers; + } + + private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) + { + var isEmpty = false; + handlerFactories.Locking(factories => isEmpty = factories.Count == 0); + if (isEmpty) + { + AnonymousHandlerFactories.TryRemove(eventName, out _); + } + } + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType); diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index 1231daf632..13edc0106d 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -70,14 +70,21 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); + var added = false; + handlerFactories.Locking(factories => + { + if (!factory.IsInFactories(factories)) + { + factories.Add(factory); + added = true; + } + }); - if (factory.IsInFactories(handlerFactories)) + if (!added) { return NullDisposable.Instance; } - handlerFactories.Add(factory); - return new EventHandlerFactoryUnregistrar(this, eventType, factory); } @@ -85,14 +92,21 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); + var added = false; + handlerFactories.Locking(factories => + { + if (!handler.IsInFactories(factories)) + { + factories.Add(handler); + added = true; + } + }); - if (handler.IsInFactories(handlerFactories)) + if (!added) { return NullDisposable.Instance; } - handlerFactories.Add(handler); - return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } @@ -149,20 +163,9 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var eventType = EventTypes.GetOrDefault(eventName); - var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - - if (eventType != null) - { - return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); - } - - if (AnonymousHandlerFactories.ContainsKey(eventName)) - { - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - throw new AbpException($"Unknown event name: {eventName}"); + var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); + return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) + ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -178,18 +181,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); - object eventData; - - if (eventType != null) - { - eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); - } - else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName)) - { - eventData = Serializer.Deserialize(outgoingEvent.EventData, typeof(object)); - } - else + if (!TryResolveStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, out var eventType, out var eventData)) { return; } @@ -230,19 +222,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - object eventData; - - if (eventType != null) - { - eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); - } - else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) - { - eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object))); - eventType = typeof(AnonymousEventData); - } - else + if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) { return; } @@ -327,33 +307,51 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public bool IsAnonymousEvent(string eventName) { - return AnonymousHandlerFactories.ContainsKey(eventName); + return HasAnonymousHandlers(eventName); } /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Remove(factory)); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - GetOrCreateAnonymousHandlerFactories(eventName) - .Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void UnsubscribeAll(string eventName) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Clear()); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -379,6 +377,81 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } + private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) + { + return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + } + + private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType == null) + { + return null; + } + + var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); + return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); + } + + private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + if (!HasAnonymousHandlers(eventName)) + { + return Task.CompletedTask; + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) + { + eventType = EventTypes.GetOrDefault(eventName)!; + if (eventType != null) + { + eventData = Serializer.Deserialize(payload, eventType); + return true; + } + + if (!HasAnonymousHandlers(eventName)) + { + eventData = default!; + eventType = default!; + return false; + } + + eventType = typeof(AnonymousEventData); + eventData = CreateAnonymousEventData(eventName, payload); + return true; + } + + private bool HasAnonymousHandlers(string eventName) + { + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return false; + } + + var hasHandlers = false; + handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); + if (!hasHandlers) + { + AnonymousHandlerFactories.TryRemove(eventName, out _); + } + + return hasHandlers; + } + + private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) + { + var isEmpty = false; + handlerFactories.Locking(factories => isEmpty = factories.Count == 0); + if (isEmpty) + { + AnonymousHandlerFactories.TryRemove(eventName, out _); + } + } + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index e7e44e449d..be82c603c1 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -2,7 +2,6 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Text.Json; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.DependencyInjection; @@ -92,10 +91,9 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen { eventData = Serializer.Deserialize(message.Value, eventType); } - else if (AnonymousHandlerFactories.ContainsKey(eventName)) + else if (HasAnonymousHandlers(eventName)) { - var element = Serializer.Deserialize(message.Value); - eventData = new AnonymousEventData(eventName, element); + eventData = CreateAnonymousEventData(eventName, message.Value); eventType = typeof(AnonymousEventData); } else @@ -117,14 +115,21 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); + var added = false; + handlerFactories.Locking(factories => + { + if (!factory.IsInFactories(factories)) + { + factories.Add(factory); + added = true; + } + }); - if (factory.IsInFactories(handlerFactories)) + if (!added) { return NullDisposable.Instance; } - handlerFactories.Add(factory); - return new EventHandlerFactoryUnregistrar(this, eventType, factory); } @@ -132,14 +137,21 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); + var added = false; + handlerFactories.Locking(factories => + { + if (!handler.IsInFactories(factories)) + { + factories.Add(handler); + added = true; + } + }); - if (handler.IsInFactories(handlerFactories)) + if (!added) { return NullDisposable.Instance; } - handlerFactories.Add(handler); - return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } @@ -200,20 +212,9 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var eventType = EventTypes.GetOrDefault(eventName); - var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - - if (eventType != null) - { - return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); - } - - if (AnonymousHandlerFactories.ContainsKey(eventName)) - { - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - throw new AbpException($"Unknown event name: {eventName}"); + var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); + return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) + ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); } protected override async Task PublishToEventBusAsync(Type eventType, object eventData) @@ -325,20 +326,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - object eventData; - - if (eventType != null) - { - eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); - } - else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) - { - var element = Serializer.Deserialize(incomingEvent.EventData); - eventData = new AnonymousEventData(incomingEvent.EventName, element); - eventType = typeof(AnonymousEventData); - } - else + if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) { return; } @@ -436,27 +424,45 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Remove(factory)); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - GetOrCreateAnonymousHandlerFactories(eventName) - .Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void UnsubscribeAll(string eventName) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Clear()); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -482,6 +488,81 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } + private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) + { + return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + } + + private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType == null) + { + return null; + } + + var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); + return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); + } + + private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + if (!HasAnonymousHandlers(eventName)) + { + return Task.CompletedTask; + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) + { + eventType = EventTypes.GetOrDefault(eventName)!; + if (eventType != null) + { + eventData = Serializer.Deserialize(payload, eventType); + return true; + } + + if (!HasAnonymousHandlers(eventName)) + { + eventData = default!; + eventType = default!; + return false; + } + + eventType = typeof(AnonymousEventData); + eventData = CreateAnonymousEventData(eventName, payload); + return true; + } + + private bool HasAnonymousHandlers(string eventName) + { + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return false; + } + + var hasHandlers = false; + handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); + if (!hasHandlers) + { + AnonymousHandlerFactories.TryRemove(eventName, out _); + } + + return hasHandlers; + } + + private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) + { + var isEmpty = false; + handlerFactories.Locking(factories => isEmpty = factories.Count == 0); + if (isEmpty) + { + AnonymousHandlerFactories.TryRemove(eventName, out _); + } + } + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index 771dae9f09..097f2cbbe7 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -109,10 +109,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis { eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); } - else if (AnonymousHandlerFactories.ContainsKey(eventName)) + else if (HasAnonymousHandlers(eventName)) { eventType = typeof(AnonymousEventData); - eventData = new AnonymousEventData(eventName, Serializer.Deserialize(ea.Body.ToArray())); + eventData = CreateAnonymousEventData(eventName, ea.Body.ToArray()); } else { @@ -134,15 +134,24 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); + var added = false; + var isFirstHandler = false; + handlerFactories.Locking(factories => + { + if (!factory.IsInFactories(factories)) + { + isFirstHandler = factories.Count == 0; + factories.Add(factory); + added = true; + } + }); - if (factory.IsInFactories(handlerFactories)) + if (!added) { return NullDisposable.Instance; } - handlerFactories.Add(factory); - - if (handlerFactories.Count == 1) //TODO: Multi-threading! + if (isFirstHandler) { Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); } @@ -154,15 +163,24 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - - if (handler.IsInFactories(handlerFactories)) + var added = false; + var isFirstHandler = false; + handlerFactories.Locking(factories => + { + if (!handler.IsInFactories(factories)) + { + isFirstHandler = factories.Count == 0; + factories.Add(handler); + added = true; + } + }); + + if (!added) { return NullDisposable.Instance; } - - handlerFactories.Add(handler); - - if (handlerFactories.Count == 1) //TODO: Multi-threading! + + if (isFirstHandler) { Consumer.BindAsync(eventName); } @@ -227,20 +245,9 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var eventType = EventTypes.GetOrDefault(eventName); - var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - - if (eventType != null) - { - return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); - } - - if (AnonymousHandlerFactories.ContainsKey(eventName)) - { - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - throw new AbpException($"Unknown event name: {eventName}"); + var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); + return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) + ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -305,19 +312,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - object eventData; - - if (eventType != null) - { - eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); - } - else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) - { - eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData)); - eventType = typeof(AnonymousEventData); - } - else + if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) { return; } @@ -487,27 +482,45 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Remove(factory)); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - GetOrCreateAnonymousHandlerFactories(eventName) - .Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void UnsubscribeAll(string eventName) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Clear()); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -533,6 +546,81 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } + private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) + { + return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + } + + private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType == null) + { + return null; + } + + var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); + return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); + } + + private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + if (!HasAnonymousHandlers(eventName)) + { + return Task.CompletedTask; + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) + { + eventType = EventTypes.GetOrDefault(eventName)!; + if (eventType != null) + { + eventData = Serializer.Deserialize(payload, eventType); + return true; + } + + if (!HasAnonymousHandlers(eventName)) + { + eventData = default!; + eventType = default!; + return false; + } + + eventType = typeof(AnonymousEventData); + eventData = CreateAnonymousEventData(eventName, payload); + return true; + } + + private bool HasAnonymousHandlers(string eventName) + { + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return false; + } + + var hasHandlers = false; + handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); + if (!hasHandlers) + { + AnonymousHandlerFactories.TryRemove(eventName, out _); + } + + return hasHandlers; + } + + private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) + { + var isEmpty = false; + handlerFactories.Locking(factories => isEmpty = factories.Count == 0); + if (isEmpty) + { + AnonymousHandlerFactories.TryRemove(eventName, out _); + } + } + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index c4f1a141e6..bf1bd1a585 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -100,15 +100,24 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); + var added = false; + var isFirstHandler = false; + handlerFactories.Locking(factories => + { + if (!factory.IsInFactories(factories)) + { + isFirstHandler = factories.Count == 0; + factories.Add(factory); + added = true; + } + }); - if (factory.IsInFactories(handlerFactories)) + if (!added) { return NullDisposable.Instance; } - handlerFactories.Add(factory); - - if (handlerFactories.Count == 1) //TODO: Multi-threading! + if (isFirstHandler) { Rebus.Subscribe(eventType); } @@ -120,15 +129,24 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); + var added = false; + var isFirstHandler = false; + handlerFactories.Locking(factories => + { + if (!handler.IsInFactories(factories)) + { + isFirstHandler = factories.Count == 0; + factories.Add(handler); + added = true; + } + }); - if (handler.IsInFactories(handlerFactories)) + if (!added) { return NullDisposable.Instance; } - handlerFactories.Add(handler); - - if (AnonymousHandlerFactories.Count == 1) //TODO: Multi-threading! + if (isFirstHandler && AnonymousHandlerFactories.Count == 1) { Rebus.Subscribe(typeof(AnonymousEventData)); } @@ -193,20 +211,9 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var eventType = EventTypes.GetOrDefault(eventName); - var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - - if (eventType != null) - { - return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); - } - - if (AnonymousHandlerFactories.ContainsKey(eventName)) - { - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - throw new AbpException($"Unknown event name: {eventName}"); + var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); + return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) + ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -228,19 +235,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); - object eventData; - - if (eventType != null) - { - eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); - } - else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName)) - { - eventData = new AnonymousEventData(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, typeof(object))); - eventType = typeof(AnonymousEventData); - } - else + if (!TryResolveStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, out var eventType, out var eventData)) { return; } @@ -273,15 +268,6 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen { await PublishFromOutboxAsync(outgoingEvent, outboxConfig); - using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) - { - await TriggerDistributedEventSentAsync(new DistributedEventSent() - { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); - } } await scope.CompleteAsync(); @@ -292,19 +278,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - object eventData; - - if (eventType != null) - { - eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); - } - else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) - { - eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object))); - eventType = typeof(AnonymousEventData); - } - else + if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) { return; } @@ -393,27 +367,45 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Remove(factory)); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - GetOrCreateAnonymousHandlerFactories(eventName) - .Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void UnsubscribeAll(string eventName) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Clear()); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -439,6 +431,81 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } + private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) + { + return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + } + + private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType == null) + { + return null; + } + + var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); + return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); + } + + private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + if (!HasAnonymousHandlers(eventName)) + { + return Task.CompletedTask; + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) + { + eventType = EventTypes.GetOrDefault(eventName)!; + if (eventType != null) + { + eventData = Serializer.Deserialize(payload, eventType); + return true; + } + + if (!HasAnonymousHandlers(eventName)) + { + eventData = default!; + eventType = default!; + return false; + } + + eventType = typeof(AnonymousEventData); + eventData = CreateAnonymousEventData(eventName, payload); + return true; + } + + private bool HasAnonymousHandlers(string eventName) + { + if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return false; + } + + var hasHandlers = false; + handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); + if (!hasHandlers) + { + AnonymousHandlerFactories.TryRemove(eventName, out _); + } + + return hasHandlers; + } + + private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) + { + var isEmpty = false; + handlerFactories.Locking(factories => isEmpty = factories.Count == 0); + if (isEmpty) + { + AnonymousHandlerFactories.TryRemove(eventName, out _); + } + } + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index 35162a0007..a5ffb87245 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -112,15 +113,9 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB bool onUnitOfWorkComplete = true, bool useOutbox = true) { - var eventType = GetEventTypeByEventName(eventName); - var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - - if (eventType != null) - { - return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox); - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); + var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); + return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete, useOutbox) + ?? PublishAnonymousByEventNameAsync(anonymousEventData, onUnitOfWorkComplete, useOutbox); } public abstract Task PublishFromOutboxAsync( @@ -152,14 +147,14 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB if (outboxConfig.Selector == null || outboxConfig.Selector(eventType)) { var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); - (var eventName, eventData) = ResolveEventForPublishing(eventType, eventData); + var (eventName, resolvedEventData) = ResolveEventForPublishing(eventType, eventData); - await OnAddToOutboxAsync(eventName, eventType, eventData); + await OnAddToOutboxAsync(eventName, eventType, resolvedEventData); var outgoingEventInfo = new OutgoingEventInfo( GuidGenerator.Create(), eventName, - Serialize(eventData), + SerializeEventData(resolvedEventData), Clock.Now ); @@ -214,13 +209,11 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB } } - eventData = GetEventData(eventData); - var incomingEventInfo = new IncomingEventInfo( GuidGenerator.Create(), messageId!, eventName, - Serialize(eventData), + SerializeEventData(eventData), Clock.Now ); incomingEventInfo.SetCorrelationId(correlationId!); @@ -235,6 +228,55 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB protected abstract byte[] Serialize(object eventData); + protected virtual byte[] SerializeEventData(object eventData) + { + if (eventData is AnonymousEventData anonymousEventData) + { + return Encoding.UTF8.GetBytes(AnonymousEventDataConverter.GetJsonData(anonymousEventData)); + } + + return Serialize(eventData); + } + + protected virtual AnonymousEventData CreateAnonymousEventData(string eventName, byte[] eventData) + { + return AnonymousEventData.FromJson(eventName, Encoding.UTF8.GetString(eventData)); + } + + protected virtual AnonymousEventData CreateAnonymousEventData(string eventName, string eventData) + { + return AnonymousEventData.FromJson(eventName, eventData); + } + + protected virtual AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) + { + return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + } + + protected virtual Task? TryPublishTypedByEventNameAsync( + string eventName, + AnonymousEventData anonymousEventData, + bool onUnitOfWorkComplete, + bool useOutbox) + { + var eventType = GetEventTypeByEventName(eventName); + if (eventType == null) + { + return null; + } + + var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); + return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete, useOutbox); + } + + protected virtual Task PublishAnonymousByEventNameAsync( + AnonymousEventData anonymousEventData, + bool onUnitOfWorkComplete, + bool useOutbox) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); + } + protected virtual async Task TriggerHandlersDirectAsync(Type eventType, object eventData) { await TriggerDistributedEventReceivedAsync(new DistributedEventReceived @@ -297,7 +339,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB { if (eventData is AnonymousEventData anonymousEventData) { - return anonymousEventData.ConvertToTypedObject(); + return AnonymousEventDataConverter.ConvertToLooseObject(anonymousEventData); } return eventData; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs index 922bc42909..ede70885ff 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs @@ -77,7 +77,8 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { AnonymousEventNames.GetOrAdd(eventName, true); - return LocalEventBus.Subscribe(eventName, handler); + LocalEventBus.Subscribe(eventName, handler); + return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } /// @@ -85,7 +86,8 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen { var eventName = EventNameAttribute.GetNameOrDefault(eventType); EventTypes.GetOrAdd(eventName, eventType); - return LocalEventBus.Subscribe(eventType, factory); + LocalEventBus.Subscribe(eventType, factory); + return new EventHandlerFactoryUnregistrar(this, eventType, factory); } public override void Unsubscribe(Func action) @@ -107,12 +109,14 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { LocalEventBus.Unsubscribe(eventName, factory); + CleanupAnonymousEventName(eventName); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { LocalEventBus.Unsubscribe(eventName, handler); + CleanupAnonymousEventName(eventName); } /// @@ -125,6 +129,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override void UnsubscribeAll(string eventName) { LocalEventBus.UnsubscribeAll(eventName); + CleanupAnonymousEventName(eventName); } /// @@ -173,25 +178,14 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) { - var eventType = EventTypes.GetOrDefault(eventName); - var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - - if (eventType != null) - { - return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox); - } - - if (!AnonymousEventNames.ContainsKey(eventName)) - { - throw new AbpException($"Unknown event name: {eventName}"); - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); + var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); + return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete, useOutbox) + ?? PublishAnonymousByEventNameAsync(anonymousEventData, onUnitOfWorkComplete, useOutbox); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - if (await AddToInboxAsync(Guid.NewGuid().ToString(), EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, null)) + if (await AddToInboxAsync(Guid.NewGuid().ToString(), GetEventName(eventType, eventData), eventType, eventData, null)) { return; } @@ -220,19 +214,12 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen EventData = outgoingEvent.EventData }); - var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); - if (eventType == null) + if (!TryResolveStoredEventType(outgoingEvent.EventName, out var eventType)) { - var isAnonymous = AnonymousEventNames.ContainsKey(outgoingEvent.EventName); - if (!isAnonymous) - { - return; - } - - eventType = typeof(AnonymousEventData); + return; } - var eventData = JsonSerializer.Deserialize(Encoding.UTF8.GetString(outgoingEvent.EventData), eventType)!; + var eventData = DeserializeStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, eventType); if (await AddToInboxAsync(Guid.NewGuid().ToString(), outgoingEvent.EventName, eventType, eventData, null)) { return; @@ -251,19 +238,12 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - if (eventType == null) + if (!TryResolveStoredEventType(incomingEvent.EventName, out var eventType)) { - var isAnonymous = AnonymousEventNames.ContainsKey(incomingEvent.EventName); - if (!isAnonymous) - { - return; - } - - eventType = typeof(AnonymousEventData); + return; } - var eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType); + var eventData = DeserializeStoredEventData(incomingEvent.EventName, incomingEvent.EventData, eventType); var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { @@ -303,4 +283,89 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen { return EventTypes.GetOrDefault(eventName); } + + protected override AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) + { + return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + } + + protected override Task? TryPublishTypedByEventNameAsync( + string eventName, + AnonymousEventData anonymousEventData, + bool onUnitOfWorkComplete, + bool useOutbox) + { + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType == null) + { + return null; + } + + var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); + return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete, useOutbox); + } + + protected override Task PublishAnonymousByEventNameAsync( + AnonymousEventData anonymousEventData, + bool onUnitOfWorkComplete, + bool useOutbox) + { + if (!HasAnonymousEventName(anonymousEventData.EventName)) + { + return Task.CompletedTask; + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); + } + + protected virtual bool TryResolveStoredEventType(string eventName, out Type eventType) + { + eventType = EventTypes.GetOrDefault(eventName)!; + if (eventType != null) + { + return true; + } + + if (!HasAnonymousEventName(eventName)) + { + return false; + } + + eventType = typeof(AnonymousEventData); + return true; + } + + protected virtual object DeserializeStoredEventData(string eventName, byte[] eventData, Type eventType) + { + if (eventType == typeof(AnonymousEventData)) + { + return CreateAnonymousEventData(eventName, eventData); + } + + return JsonSerializer.Deserialize(Encoding.UTF8.GetString(eventData), eventType)!; + } + + protected virtual void CleanupAnonymousEventName(string eventName) + { + if (!LocalEventBus.GetAnonymousEventHandlerFactories(eventName).Any()) + { + AnonymousEventNames.TryRemove(eventName, out _); + } + } + + protected virtual bool HasAnonymousEventName(string eventName) + { + if (!AnonymousEventNames.ContainsKey(eventName)) + { + return false; + } + + if (!LocalEventBus.GetAnonymousEventHandlerFactories(eventName).Any()) + { + AnonymousEventNames.TryRemove(eventName, out _); + return false; + } + + return true; + } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs index 8c344f5aa7..b4b589d6dd 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs @@ -174,9 +174,7 @@ public abstract class EventBusBase : IEventBus actualEventType.GetGenericArguments().Length == 1 && typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(actualEventType)) { - var resolvedEventData = eventData is AnonymousEventData aed - ? aed.ConvertToTypedObject(actualEventType) - : eventData; + var resolvedEventData = ResolveActualEventData(eventData, actualEventType); var genericArg = actualEventType.GetGenericArguments()[0]; var baseArg = genericArg.GetTypeInfo().BaseType; @@ -209,7 +207,7 @@ public abstract class EventBusBase : IEventBus { if (eventData is AnonymousEventData anonymousEventData && handlerEventType != typeof(AnonymousEventData)) { - return anonymousEventData.ConvertToTypedObject(handlerEventType); + return AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, handlerEventType); } if (handlerEventType == typeof(AnonymousEventData) && eventData is not AnonymousEventData) @@ -220,6 +218,16 @@ public abstract class EventBusBase : IEventBus return eventData; } + protected virtual object ResolveActualEventData(object eventData, Type actualEventType) + { + if (eventData is AnonymousEventData anonymousEventData) + { + return AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, actualEventType); + } + + return eventData; + } + protected void ThrowOriginalExceptions(Type eventType, List exceptions) { if (exceptions.Count == 1) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs index 989191528c..e8f0715d8d 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs @@ -60,7 +60,8 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency /// public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => + var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); + handlerFactories.Locking(factories => { if (!handler.IsInFactories(factories)) { @@ -140,21 +141,33 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); + if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Remove(factory)); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - GetOrCreateAnonymousHandlerFactories(eventName) - .Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); + if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// @@ -166,28 +179,21 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency /// public override void UnsubscribeAll(string eventName) { - GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); + if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories)) + { + return; + } + + handlerFactories.Locking(factories => factories.Clear()); + CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); } /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var eventType = EventTypes.GetOrDefault(eventName); - - var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - - if (eventType != null) - { - return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); - } - - var isAnonymous = AnonymousEventHandlerFactories.ContainsKey(eventName); - if (!isAnonymous) - { - throw new AbpException($"Unknown event name: {eventName}"); - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); + return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) + ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); } protected override async Task PublishToEventBusAsync(Type eventType, object eventData) @@ -225,10 +231,11 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency { foreach (var factory in handlerFactory.Value) { + using var handler = factory.GetHandler(); handlerFactoryList.Add(new Tuple( factory, handlerFactory.Key, - ReflectionHelper.GetAttributesOfMemberOrDeclaringType(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0)); + ReflectionHelper.GetAttributesOfMemberOrDeclaringType(handler.EventHandler.GetType()).FirstOrDefault()?.Order ?? 0)); } } @@ -236,10 +243,11 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency { foreach (var factory in handlerFactory.Value) { + using var handler = factory.GetHandler(); handlerFactoryList.Add(new Tuple( factory, typeof(AnonymousEventData), - ReflectionHelper.GetAttributesOfMemberOrDeclaringType(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0)); + ReflectionHelper.GetAttributesOfMemberOrDeclaringType(handler.EventHandler.GetType()).FirstOrDefault()?.Order ?? 0)); } } @@ -290,6 +298,67 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency return AnonymousEventHandlerFactories.GetOrAdd(eventName, (name) => new List()); } + private bool TryGetAnonymousHandlerFactories(string eventName, out List handlerFactories) + { + return AnonymousEventHandlerFactories.TryGetValue(eventName, out handlerFactories!); + } + + private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) + { + return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + } + + private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType == null) + { + return null; + } + + var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); + return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); + } + + private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) + { + if (!HasAnonymousHandlers(eventName)) + { + return Task.CompletedTask; + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + private bool HasAnonymousHandlers(string eventName) + { + if (!AnonymousEventHandlerFactories.TryGetValue(eventName, out var handlerFactories)) + { + return false; + } + + var hasHandlers = false; + handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); + + if (!hasHandlers) + { + AnonymousEventHandlerFactories.TryRemove(eventName, out _); + } + + return hasHandlers; + } + + private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) + { + var isEmpty = false; + handlerFactories.Locking(factories => isEmpty = factories.Count == 0); + + if (isEmpty) + { + AnonymousEventHandlerFactories.TryRemove(eventName, out _); + } + } + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { if (handlerEventType == targetEventType) diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs index 5ceb4b1bb5..673454c6be 100644 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Text; using System.Threading.Tasks; using Shouldly; using Volo.Abp.Domain.Entities.Events.Distributed; @@ -21,20 +22,32 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase [Fact] public async Task Should_Call_Handler_AndDispose() { - using var subscription = DistributedEventBus.Subscribe(); + var handleCount = 0; + var disposeCount = 0; + var factory = new CountingDistributedEventHandlerFactory( + () => handleCount++, + () => disposeCount++); + + using var subscription = DistributedEventBus.Subscribe(typeof(MySimpleEventData), factory); await DistributedEventBus.PublishAsync(new MySimpleEventData(1)); await DistributedEventBus.PublishAsync(new MySimpleEventData(2)); await DistributedEventBus.PublishAsync(new MySimpleEventData(3)); - Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount); - Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount); + Assert.Equal(3, handleCount); + Assert.Equal(3, disposeCount); } [Fact] public async Task Should_Handle_Typed_Handler_When_Published_With_EventName() { - using var subscription = DistributedEventBus.Subscribe(); + var handleCount = 0; + var disposeCount = 0; + var factory = new CountingDistributedEventHandlerFactory( + () => handleCount++, + () => disposeCount++); + + using var subscription = DistributedEventBus.Subscribe(typeof(MySimpleEventData), factory); var eventName = EventNameAttribute.GetNameOrDefault(); await DistributedEventBus.PublishAsync(eventName, new MySimpleEventData(1)); @@ -44,8 +57,8 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase }); await DistributedEventBus.PublishAsync(eventName, new { Value = 3 }); - Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount); - Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount); + Assert.Equal(3, handleCount); + Assert.Equal(3, disposeCount); } [Fact] @@ -82,7 +95,7 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => { handleCount++; - d.ConvertToTypedObject().ShouldNotBeNull(); + AnonymousEventDataConverter.ConvertToLooseObject(d).ShouldNotBeNull(); await Task.CompletedTask; }))); @@ -187,16 +200,15 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase } [Fact] - public async Task Should_Throw_For_Unknown_Event_Name() + public async Task Should_Ignore_Unknown_Event_Name() { - await Assert.ThrowsAsync(() => - DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 })); + await DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }); } [Fact] public async Task Should_Convert_AnonymousEventData_To_Typed_Object() { - MySimpleEventData? receivedData = null; + MySimpleEventData receivedData = null!; using var subscription = DistributedEventBus.Subscribe(async (data) => { @@ -211,6 +223,63 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase receivedData.Value.ShouldBe(42); } + [Fact] + public async Task Should_Rehydrate_Anonymous_Event_From_Outbox_Using_Raw_Json() + { + var localDistributedEventBus = GetRequiredService(); + AnonymousEventData receivedData = null!; + var eventName = "MyEvent-" + Guid.NewGuid().ToString("N"); + + using var subscription = localDistributedEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async data => + { + receivedData = data; + await Task.CompletedTask; + }))); + + var outgoingEvent = new OutgoingEventInfo( + Guid.NewGuid(), + eventName, + Encoding.UTF8.GetBytes("{\"Value\":42}"), + DateTime.UtcNow); + + await localDistributedEventBus.PublishFromOutboxAsync(outgoingEvent, new OutboxConfig("Test") { DatabaseName = "Test" }); + + receivedData.ShouldNotBeNull(); + receivedData.EventName.ShouldBe(eventName); + AnonymousEventDataConverter.GetJsonData(receivedData).ShouldBe("{\"Value\":42}"); + AnonymousEventDataConverter.ConvertToTypedObject(receivedData).Value.ShouldBe(42); + } + + [Fact] + public async Task Should_Rehydrate_Anonymous_Event_From_Inbox_Using_Raw_Json() + { + var localDistributedEventBus = GetRequiredService(); + AnonymousEventData receivedData = null!; + var eventName = "MyEvent-" + Guid.NewGuid().ToString("N"); + + using var subscription = localDistributedEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async data => + { + receivedData = data; + await Task.CompletedTask; + }))); + + var incomingEvent = new IncomingEventInfo( + Guid.NewGuid(), + Guid.NewGuid().ToString("N"), + eventName, + Encoding.UTF8.GetBytes("\"hello\""), + DateTime.UtcNow); + + await localDistributedEventBus.ProcessFromInboxAsync(incomingEvent, new InboxConfig("Test") { DatabaseName = "Test" }); + + receivedData.ShouldNotBeNull(); + receivedData.EventName.ShouldBe(eventName); + AnonymousEventDataConverter.GetJsonData(receivedData).ShouldBe("\"hello\""); + AnonymousEventDataConverter.ConvertToTypedObject(receivedData).ShouldBe("hello"); + } + [Fact] public async Task Should_Change_TenantId_If_EventData_Is_MultiTenant() { @@ -309,4 +378,57 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase } } + private sealed class CountingDistributedEventHandlerFactory : IEventHandlerFactory + { + private readonly Action _handleAction; + private readonly Action _disposeAction; + + public CountingDistributedEventHandlerFactory(Action handleAction, Action disposeAction) + { + _handleAction = handleAction; + _disposeAction = disposeAction; + } + + public IEventHandlerDisposeWrapper GetHandler() + { + var wasHandled = false; + return new EventHandlerDisposeWrapper( + new CountingDistributedEventHandler( + _handleAction, + () => wasHandled = true), + () => + { + if (wasHandled) + { + _disposeAction(); + } + } + ); + } + + public bool IsInFactories(List handlerFactories) + { + return handlerFactories.Contains(this); + } + } + + private sealed class CountingDistributedEventHandler : IDistributedEventHandler + { + private readonly Action _handleAction; + private readonly Action _markHandled; + + public CountingDistributedEventHandler(Action handleAction, Action markHandled) + { + _handleAction = handleAction; + _markHandled = markHandled; + } + + public Task HandleEventAsync(MySimpleEventData eventData) + { + _markHandled(); + _handleAction(); + return Task.CompletedTask; + } + } + } diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs index 7d3e5748e9..dfcd74bed8 100644 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs @@ -48,7 +48,7 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase [Fact] public async Task Should_Convert_Dictionary_To_Typed_Handler() { - MySimpleEventData? receivedData = null; + MySimpleEventData receivedData = null!; using var subscription = LocalEventBus.Subscribe(async (data) => { @@ -118,29 +118,28 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase } [Fact] - public async Task Should_Throw_For_Unknown_Event_Name() + public async Task Should_Ignore_Unknown_Event_Name() { - await Assert.ThrowsAsync(() => - LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 })); + await LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }); } [Fact] public async Task Should_ConvertToTypedObject_In_Anonymous_Handler() { - object? receivedData = null; + object receivedData = null!; var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); using var subscription = LocalEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => { - receivedData = d.ConvertToTypedObject(); + receivedData = AnonymousEventDataConverter.ConvertToLooseObject(d); await Task.CompletedTask; }))); await LocalEventBus.PublishAsync(eventName, new { Name = "Hello", Count = 42 }); receivedData.ShouldNotBeNull(); - var dict = receivedData.ShouldBeOfType>(); + var dict = receivedData.ShouldBeOfType>(); dict["Name"].ShouldBe("Hello"); dict["Count"].ShouldBe(42L); } @@ -148,13 +147,13 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase [Fact] public async Task Should_ConvertToTypedObject_Generic_In_Anonymous_Handler() { - MySimpleEventData? receivedData = null; + MySimpleEventData receivedData = null!; var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); using var subscription = LocalEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => { - receivedData = d.ConvertToTypedObject(); + receivedData = AnonymousEventDataConverter.ConvertToTypedObject(d); await Task.CompletedTask; }))); @@ -163,4 +162,23 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase receivedData.ShouldNotBeNull(); receivedData.Value.ShouldBe(99); } + + [Fact] + public void Should_Roundtrip_Raw_Json_Without_Object_Deserialization() + { + var eventData = AnonymousEventData.FromJson("TestEvent", "{\"Value\":42,\"Name\":\"hello\"}"); + + eventData.EventName.ShouldBe("TestEvent"); + AnonymousEventDataConverter.GetJsonData(eventData).ShouldBe("{\"Value\":42,\"Name\":\"hello\"}"); + AnonymousEventDataConverter.ConvertToTypedObject(eventData).Value.ShouldBe(42); + } + + [Fact] + public void Should_Preserve_String_Payload_As_Raw_Json() + { + var eventData = AnonymousEventData.FromJson("TestEvent", "\"hello\""); + + AnonymousEventDataConverter.GetJsonData(eventData).ShouldBe("\"hello\""); + AnonymousEventDataConverter.ConvertToTypedObject(eventData).ShouldBe("hello"); + } }