From fb47069209640ed5fe296e8860752d38f4f7683c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?SAL=C4=B0H=20=C3=96ZKARA?= Date: Tue, 10 Mar 2026 20:00:34 +0300 Subject: [PATCH] Unify anonymous event handling across buses Add native anonymous event support and simplify handling across transports. AnonymousEventData now contains conversion helpers (ConvertToTypedObject/ConvertToTypedObject/ConvertToTypedObject -> loose typed object), caching JSON elements and replacing the removed AnonymousEventDataConverter. Multiple distributed event bus implementations (Azure, Dapr, Kafka, RabbitMQ, Rebus) were updated to: detect anonymous handlers via AnonymousHandlerFactories, construct AnonymousEventData when appropriate, resolve event types at publish/process time, simplify Subscribe/Unsubscribe logic (avoid duplicate-factory checks using IsInFactories then add), and throw on unknown event names in PublishAsync. AbpAspNetCoreMvcDaprEventBusModule was refactored to deserialize and trigger handlers inline for both envelope and direct Dapr events. Tests updated accordingly and a small cursor hook state file was added. --- .../AbpAspNetCoreMvcDaprEventBusModule.cs | 81 ++----- .../Volo/Abp/EventBus/AnonymousEventData.cs | 115 ++++++++-- .../EventBus/AnonymousEventDataConverter.cs | 95 -------- .../Azure/AzureDistributedEventBus.cs | 181 +++++----------- .../EventBus/Dapr/DaprDistributedEventBus.cs | 187 +++++----------- .../Kafka/KafkaDistributedEventBus.cs | 179 +++++---------- .../RabbitMq/RabbitMqDistributedEventBus.cs | 188 +++++----------- .../Rebus/RebusDistributedEventBus.cs | 203 ++++++------------ .../Distributed/DistributedEventBusBase.cs | 74 ++----- .../Distributed/LocalDistributedEventBus.cs | 164 ++++++-------- .../Volo/Abp/EventBus/EventBusBase.cs | 16 +- .../Volo/Abp/EventBus/Local/LocalEventBus.cs | 153 +++---------- .../LocalDistributedEventBus_Test.cs | 144 +------------ .../Local/LocalEventBus_Anonymous_Test.cs | 36 +--- 14 files changed, 531 insertions(+), 1285 deletions(-) delete mode 100644 framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs index 079ed1287a..f770f5fbb2 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Text.Json; @@ -96,11 +96,30 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule if (IsAbpDaprEventData(data)) { - await TryHandleAbpDaprEnvelopeAsync(distributedEventBus, daprSerializer, data); + var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As(); + var eventType = distributedEventBus.GetEventType(daprEventData.Topic); + if (eventType != null) + { + var eventData = daprSerializer.Deserialize(daprEventData.JsonData, eventType); + await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId); + }else if (distributedEventBus.IsAnonymousEvent(daprEventData.Topic)) + { + var eventData = daprSerializer.Deserialize(daprEventData.JsonData, typeof(object)); + await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(daprEventData.Topic, eventData), daprEventData.MessageId, daprEventData.CorrelationId); + } } else { - await TryHandleDirectDaprEventAsync(distributedEventBus, daprSerializer, topic!, data); + var eventType = distributedEventBus.GetEventType(topic); + if (eventType != null) + { + var eventData = daprSerializer.Deserialize(data, eventType); + await distributedEventBus.TriggerHandlersAsync(eventType, eventData); + }else if (distributedEventBus.IsAnonymousEvent(topic)) + { + var eventData = daprSerializer.Deserialize(data, typeof(object)); + await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(topic, eventData)); + } } httpContext.Response.StatusCode = 200; @@ -117,60 +136,4 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule objects.Any(x => x.Name.Equals("JsonData", StringComparison.CurrentCultureIgnoreCase)) && objects.Any(x => x.Name.Equals("CorrelationId", StringComparison.CurrentCultureIgnoreCase)); } - - private static async Task TryHandleAbpDaprEnvelopeAsync( - DaprDistributedEventBus distributedEventBus, - IDaprSerializer daprSerializer, - string data) - { - var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As(); - if (!TryResolveIncomingEvent(distributedEventBus, daprSerializer, daprEventData.Topic, daprEventData.JsonData, out var eventType, out var eventData)) - { - return; - } - - await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId); - } - - private static async Task TryHandleDirectDaprEventAsync( - DaprDistributedEventBus distributedEventBus, - IDaprSerializer daprSerializer, - string topic, - string data) - { - if (!TryResolveIncomingEvent(distributedEventBus, daprSerializer, topic, data, out var eventType, out var eventData)) - { - return; - } - - await distributedEventBus.TriggerHandlersAsync(eventType, eventData); - } - - private static bool TryResolveIncomingEvent( - DaprDistributedEventBus distributedEventBus, - IDaprSerializer daprSerializer, - string topic, - string data, - out Type eventType, - out object eventData) - { - var typedEventType = distributedEventBus.GetEventType(topic); - if (typedEventType != null) - { - eventType = typedEventType; - eventData = daprSerializer.Deserialize(data, typedEventType); - return true; - } - - if (!distributedEventBus.IsAnonymousEvent(topic)) - { - eventType = default!; - eventData = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = AnonymousEventData.FromJson(topic, data); - return true; - } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs index 255b4cffff..1db318b40e 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs @@ -1,4 +1,8 @@ using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; + namespace Volo.Abp.EventBus; /// @@ -14,14 +18,11 @@ public class AnonymousEventData public string EventName { get; } /// - /// The raw event data payload. Can be a CLR object, , or any serializable object. + /// The raw event data payload. Can be a CLR object, , or any serializable object. /// - internal object Data { get; } + public object Data { get; } - /// - /// The raw JSON payload when the event is created from transport data. - /// - public string? JsonData { get; } + private JsonElement? _cachedJsonElement; /// /// Creates a new instance of . @@ -35,17 +36,105 @@ public class AnonymousEventData } /// - /// Creates a new instance of from raw JSON. + /// Converts the to a loosely-typed object graph + /// (dictionaries for objects, lists for arrays, primitives for values). /// - public static AnonymousEventData FromJson(string eventName, string jsonData) + /// A CLR object representation of the event data + public object ConvertToTypedObject() { - return new AnonymousEventData(eventName, data: null!, jsonData); + return ConvertElement(GetJsonElement()); } - private AnonymousEventData(string eventName, object data, string? jsonData) + /// + /// Converts the to a strongly-typed object. + /// Returns the data directly if it is already of type , + /// otherwise deserializes from JSON. + /// + /// Target type to convert to + /// The deserialized object of type + /// Thrown when deserialization fails + public T ConvertToTypedObject() { - EventName = eventName; - Data = data; - JsonData = jsonData; + if (Data is T typedData) + { + return typedData; + } + + return GetJsonElement().Deserialize() + ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}."); + } + + /// + /// Converts the to the specified . + /// Returns the data directly if it is already an instance of the target type, + /// otherwise deserializes from JSON. + /// + /// Target type to convert to + /// The deserialized object + /// Thrown when deserialization fails + public object ConvertToTypedObject(Type type) + { + if (type.IsInstanceOfType(Data)) + { + return Data; + } + + return GetJsonElement().Deserialize(type) + ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}."); + } + + private JsonElement GetJsonElement() + { + if (_cachedJsonElement.HasValue) + { + return _cachedJsonElement.Value; + } + + if (Data is JsonElement existingElement) + { + _cachedJsonElement = existingElement; + return existingElement; + } + + _cachedJsonElement = JsonSerializer.SerializeToElement(Data); + return _cachedJsonElement.Value; + } + + private static object ConvertElement(JsonElement element) + { + switch (element.ValueKind) + { + case JsonValueKind.Object: + { + var obj = new Dictionary(); + foreach (var property in element.EnumerateObject()) + { + obj[property.Name] = property.Value.ValueKind == JsonValueKind.Null + ? null + : ConvertElement(property.Value); + } + return obj; + } + case JsonValueKind.Array: + return element.EnumerateArray() + .Select(item => item.ValueKind == JsonValueKind.Null ? null : (object?)ConvertElement(item)) + .ToList(); + case JsonValueKind.String: + return element.GetString()!; + case JsonValueKind.Number when element.TryGetInt64(out var longValue): + return longValue; + case JsonValueKind.Number when element.TryGetDecimal(out var decimalValue): + return decimalValue; + case JsonValueKind.Number when element.TryGetDouble(out var doubleValue): + return doubleValue; + case JsonValueKind.True: + return true; + case JsonValueKind.False: + return false; + case JsonValueKind.Null: + case JsonValueKind.Undefined: + default: + return null!; + } } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs deleted file mode 100644 index e4b9234ccd..0000000000 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs +++ /dev/null @@ -1,95 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text.Json; - -namespace Volo.Abp.EventBus; - -public static class AnonymousEventDataConverter -{ - public static T ConvertToTypedObject(AnonymousEventData eventData) - { - if (eventData.Data is T typedData) - { - return typedData; - } - - return ParseJsonElement(eventData).Deserialize() - ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}."); - } - - public static object ConvertToTypedObject(AnonymousEventData eventData, Type type) - { - if (type.IsInstanceOfType(eventData.Data)) - { - return eventData.Data; - } - - return ParseJsonElement(eventData).Deserialize(type) - ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}."); - } - - public static object ConvertToLooseObject(AnonymousEventData eventData) - { - return ConvertElement(ParseJsonElement(eventData)); - } - - public static string GetJsonData(AnonymousEventData eventData) - { - return eventData.JsonData ?? ParseJsonElement(eventData).GetRawText(); - } - - private static JsonElement ParseJsonElement(AnonymousEventData eventData) - { - if (eventData.Data is JsonElement existingElement) - { - return existingElement; - } - - if (eventData.JsonData != null) - { - return JsonDocument.Parse(eventData.JsonData).RootElement.Clone(); - } - - return JsonSerializer.SerializeToElement(eventData.Data); - } - - private static object ConvertElement(JsonElement element) - { - switch (element.ValueKind) - { - case JsonValueKind.Object: - { - var obj = new Dictionary(); - foreach (var property in element.EnumerateObject()) - { - obj[property.Name] = property.Value.ValueKind == JsonValueKind.Null - ? null - : ConvertElement(property.Value); - } - - return obj; - } - case JsonValueKind.Array: - return element.EnumerateArray() - .Select(item => item.ValueKind == JsonValueKind.Null ? null : (object?)ConvertElement(item)) - .ToList(); - case JsonValueKind.String: - return element.GetString()!; - case JsonValueKind.Number when element.TryGetInt64(out var longValue): - return longValue; - case JsonValueKind.Number when element.TryGetDecimal(out var decimalValue): - return decimalValue; - case JsonValueKind.Number when element.TryGetDouble(out var doubleValue): - return doubleValue; - case JsonValueKind.True: - return true; - case JsonValueKind.False: - return false; - case JsonValueKind.Null: - case JsonValueKind.Undefined: - default: - return null!; - } - } -} diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index 83eba73fa3..020757ad07 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Text.Json; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using Microsoft.Extensions.DependencyInjection; @@ -91,9 +92,10 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen { eventData = Serializer.Deserialize(message.Body.ToArray(), eventType); } - else if (HasAnonymousHandlers(eventName)) + else if (AnonymousHandlerFactories.ContainsKey(eventName)) { - eventData = CreateAnonymousEventData(eventName, Serializer.Deserialize(message.Body.ToArray())); + var data = Serializer.Deserialize(message.Body.ToArray()); + eventData = new AnonymousEventData(eventName, data); eventType = typeof(AnonymousEventData); } else @@ -115,21 +117,14 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); - var added = false; - handlerFactories.Locking(factories => - { - if (!factory.IsInFactories(factories)) - { - factories.Add(factory); - added = true; - } - }); - if (!added) + if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(factory); + return new EventHandlerFactoryUnregistrar(this, eventType, factory); } @@ -137,21 +132,14 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - var added = false; - handlerFactories.Locking(factories => - { - if (!handler.IsInFactories(factories)) - { - factories.Add(handler); - added = true; - } - }); - if (!added) + if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(handler); + return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } @@ -211,9 +199,20 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + if (AnonymousHandlerFactories.ContainsKey(eventName)) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + throw new AbpException($"Unknown event name: {eventName}"); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -288,7 +287,20 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + var element = Serializer.Deserialize(incomingEvent.EventData); + eventData = new AnonymousEventData(incomingEvent.EventName, element); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -388,45 +400,29 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// public override void UnsubscribeAll(string eventName) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -452,81 +448,6 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - eventData = Serializer.Deserialize(payload, eventType); - return true; - } - - if (!HasAnonymousHandlers(eventName)) - { - eventData = default!; - eventType = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, payload); - return true; - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - if (!hasHandlers) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - if (isEmpty) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType); diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index 13edc0106d..1231daf632 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -70,21 +70,14 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); - var added = false; - handlerFactories.Locking(factories => - { - if (!factory.IsInFactories(factories)) - { - factories.Add(factory); - added = true; - } - }); - if (!added) + if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(factory); + return new EventHandlerFactoryUnregistrar(this, eventType, factory); } @@ -92,21 +85,14 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - var added = false; - handlerFactories.Locking(factories => - { - if (!handler.IsInFactories(factories)) - { - factories.Add(handler); - added = true; - } - }); - if (!added) + if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(handler); + return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } @@ -163,9 +149,20 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + if (AnonymousHandlerFactories.ContainsKey(eventName)) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + throw new AbpException($"Unknown event name: {eventName}"); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -181,7 +178,18 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - if (!TryResolveStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName)) + { + eventData = Serializer.Deserialize(outgoingEvent.EventData, typeof(object)); + } + else { return; } @@ -222,7 +230,19 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object))); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -307,51 +327,33 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public bool IsAnonymousEvent(string eventName) { - return HasAnonymousHandlers(eventName); + return AnonymousHandlerFactories.ContainsKey(eventName); } /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// public override void UnsubscribeAll(string eventName) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -377,81 +379,6 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - eventData = Serializer.Deserialize(payload, eventType); - return true; - } - - if (!HasAnonymousHandlers(eventName)) - { - eventData = default!; - eventType = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, payload); - return true; - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - if (!hasHandlers) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - if (isEmpty) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index be82c603c1..e7e44e449d 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Text.Json; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.DependencyInjection; @@ -91,9 +92,10 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen { eventData = Serializer.Deserialize(message.Value, eventType); } - else if (HasAnonymousHandlers(eventName)) + else if (AnonymousHandlerFactories.ContainsKey(eventName)) { - eventData = CreateAnonymousEventData(eventName, message.Value); + var element = Serializer.Deserialize(message.Value); + eventData = new AnonymousEventData(eventName, element); eventType = typeof(AnonymousEventData); } else @@ -115,21 +117,14 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); - var added = false; - handlerFactories.Locking(factories => - { - if (!factory.IsInFactories(factories)) - { - factories.Add(factory); - added = true; - } - }); - if (!added) + if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(factory); + return new EventHandlerFactoryUnregistrar(this, eventType, factory); } @@ -137,21 +132,14 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - var added = false; - handlerFactories.Locking(factories => - { - if (!handler.IsInFactories(factories)) - { - factories.Add(handler); - added = true; - } - }); - if (!added) + if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(handler); + return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } @@ -212,9 +200,20 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + if (AnonymousHandlerFactories.ContainsKey(eventName)) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + throw new AbpException($"Unknown event name: {eventName}"); } protected override async Task PublishToEventBusAsync(Type eventType, object eventData) @@ -326,7 +325,20 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + var element = Serializer.Deserialize(incomingEvent.EventData); + eventData = new AnonymousEventData(incomingEvent.EventName, element); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -424,45 +436,27 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// public override void UnsubscribeAll(string eventName) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -488,81 +482,6 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - eventData = Serializer.Deserialize(payload, eventType); - return true; - } - - if (!HasAnonymousHandlers(eventName)) - { - eventData = default!; - eventType = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, payload); - return true; - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - if (!hasHandlers) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - if (isEmpty) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index 097f2cbbe7..771dae9f09 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -109,10 +109,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis { eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); } - else if (HasAnonymousHandlers(eventName)) + else if (AnonymousHandlerFactories.ContainsKey(eventName)) { eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, ea.Body.ToArray()); + eventData = new AnonymousEventData(eventName, Serializer.Deserialize(ea.Body.ToArray())); } else { @@ -134,24 +134,15 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); - var added = false; - var isFirstHandler = false; - handlerFactories.Locking(factories => - { - if (!factory.IsInFactories(factories)) - { - isFirstHandler = factories.Count == 0; - factories.Add(factory); - added = true; - } - }); - if (!added) + if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } - if (isFirstHandler) + handlerFactories.Add(factory); + + if (handlerFactories.Count == 1) //TODO: Multi-threading! { Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); } @@ -163,24 +154,15 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - var added = false; - var isFirstHandler = false; - handlerFactories.Locking(factories => - { - if (!handler.IsInFactories(factories)) - { - isFirstHandler = factories.Count == 0; - factories.Add(handler); - added = true; - } - }); - - if (!added) + + if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } - - if (isFirstHandler) + + handlerFactories.Add(handler); + + if (handlerFactories.Count == 1) //TODO: Multi-threading! { Consumer.BindAsync(eventName); } @@ -245,9 +227,20 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + if (AnonymousHandlerFactories.ContainsKey(eventName)) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + throw new AbpException($"Unknown event name: {eventName}"); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -312,7 +305,19 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData)); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -482,45 +487,27 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// public override void UnsubscribeAll(string eventName) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -546,81 +533,6 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - eventData = Serializer.Deserialize(payload, eventType); - return true; - } - - if (!HasAnonymousHandlers(eventName)) - { - eventData = default!; - eventType = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, payload); - return true; - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - if (!hasHandlers) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - if (isEmpty) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index bf1bd1a585..c4f1a141e6 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -100,24 +100,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); - var added = false; - var isFirstHandler = false; - handlerFactories.Locking(factories => - { - if (!factory.IsInFactories(factories)) - { - isFirstHandler = factories.Count == 0; - factories.Add(factory); - added = true; - } - }); - if (!added) + if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } - if (isFirstHandler) + handlerFactories.Add(factory); + + if (handlerFactories.Count == 1) //TODO: Multi-threading! { Rebus.Subscribe(eventType); } @@ -129,24 +120,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - var added = false; - var isFirstHandler = false; - handlerFactories.Locking(factories => - { - if (!handler.IsInFactories(factories)) - { - isFirstHandler = factories.Count == 0; - factories.Add(handler); - added = true; - } - }); - if (!added) + if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } - if (isFirstHandler && AnonymousHandlerFactories.Count == 1) + handlerFactories.Add(handler); + + if (AnonymousHandlerFactories.Count == 1) //TODO: Multi-threading! { Rebus.Subscribe(typeof(AnonymousEventData)); } @@ -211,9 +193,20 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + if (AnonymousHandlerFactories.ContainsKey(eventName)) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + throw new AbpException($"Unknown event name: {eventName}"); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -235,7 +228,19 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - if (!TryResolveStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName)) + { + eventData = new AnonymousEventData(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, typeof(object))); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -268,6 +273,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen { await PublishFromOutboxAsync(outgoingEvent, outboxConfig); + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) + { + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } } await scope.CompleteAsync(); @@ -278,7 +292,19 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object))); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -367,45 +393,27 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// public override void UnsubscribeAll(string eventName) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -431,81 +439,6 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - eventData = Serializer.Deserialize(payload, eventType); - return true; - } - - if (!HasAnonymousHandlers(eventName)) - { - eventData = default!; - eventType = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, payload); - return true; - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - if (!hasHandlers) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - if (isEmpty) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index a5ffb87245..88ce5662f8 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -113,9 +112,15 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB bool onUnitOfWorkComplete = true, bool useOutbox = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete, useOutbox) - ?? PublishAnonymousByEventNameAsync(anonymousEventData, onUnitOfWorkComplete, useOutbox); + var eventType = GetEventTypeByEventName(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox); + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); } public abstract Task PublishFromOutboxAsync( @@ -147,14 +152,14 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB if (outboxConfig.Selector == null || outboxConfig.Selector(eventType)) { var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); - var (eventName, resolvedEventData) = ResolveEventForPublishing(eventType, eventData); + (var eventName, eventData) = ResolveEventForPublishing(eventType, eventData); - await OnAddToOutboxAsync(eventName, eventType, resolvedEventData); + await OnAddToOutboxAsync(eventName, eventType, eventData); var outgoingEventInfo = new OutgoingEventInfo( GuidGenerator.Create(), eventName, - SerializeEventData(resolvedEventData), + Serialize(eventData), Clock.Now ); @@ -209,11 +214,13 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB } } + eventData = GetEventData(eventData); + var incomingEventInfo = new IncomingEventInfo( GuidGenerator.Create(), messageId!, eventName, - SerializeEventData(eventData), + Serialize(eventData), Clock.Now ); incomingEventInfo.SetCorrelationId(correlationId!); @@ -228,55 +235,6 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB protected abstract byte[] Serialize(object eventData); - protected virtual byte[] SerializeEventData(object eventData) - { - if (eventData is AnonymousEventData anonymousEventData) - { - return Encoding.UTF8.GetBytes(AnonymousEventDataConverter.GetJsonData(anonymousEventData)); - } - - return Serialize(eventData); - } - - protected virtual AnonymousEventData CreateAnonymousEventData(string eventName, byte[] eventData) - { - return AnonymousEventData.FromJson(eventName, Encoding.UTF8.GetString(eventData)); - } - - protected virtual AnonymousEventData CreateAnonymousEventData(string eventName, string eventData) - { - return AnonymousEventData.FromJson(eventName, eventData); - } - - protected virtual AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - protected virtual Task? TryPublishTypedByEventNameAsync( - string eventName, - AnonymousEventData anonymousEventData, - bool onUnitOfWorkComplete, - bool useOutbox) - { - var eventType = GetEventTypeByEventName(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete, useOutbox); - } - - protected virtual Task PublishAnonymousByEventNameAsync( - AnonymousEventData anonymousEventData, - bool onUnitOfWorkComplete, - bool useOutbox) - { - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); - } - protected virtual async Task TriggerHandlersDirectAsync(Type eventType, object eventData) { await TriggerDistributedEventReceivedAsync(new DistributedEventReceived @@ -339,7 +297,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB { if (eventData is AnonymousEventData anonymousEventData) { - return AnonymousEventDataConverter.ConvertToLooseObject(anonymousEventData); + return anonymousEventData.Data; } return eventData; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs index ede70885ff..c78b30260e 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs @@ -3,7 +3,6 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; -using System.Text; using System.Text.Json; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; @@ -77,8 +76,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { AnonymousEventNames.GetOrAdd(eventName, true); - LocalEventBus.Subscribe(eventName, handler); - return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); + return LocalEventBus.Subscribe(eventName, handler); } /// @@ -86,8 +84,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen { var eventName = EventNameAttribute.GetNameOrDefault(eventType); EventTypes.GetOrAdd(eventName, eventType); - LocalEventBus.Subscribe(eventType, factory); - return new EventHandlerFactoryUnregistrar(this, eventType, factory); + return LocalEventBus.Subscribe(eventType, factory); } public override void Unsubscribe(Func action) @@ -109,14 +106,12 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { LocalEventBus.Unsubscribe(eventName, factory); - CleanupAnonymousEventName(eventName); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { LocalEventBus.Unsubscribe(eventName, handler); - CleanupAnonymousEventName(eventName); } /// @@ -129,7 +124,6 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override void UnsubscribeAll(string eventName) { LocalEventBus.UnsubscribeAll(eventName); - CleanupAnonymousEventName(eventName); } /// @@ -178,14 +172,25 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete, useOutbox) - ?? PublishAnonymousByEventNameAsync(anonymousEventData, onUnitOfWorkComplete, useOutbox); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox); + } + + if (!AnonymousEventNames.ContainsKey(eventName)) + { + throw new AbpException($"Unknown event name: {eventName}"); + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - if (await AddToInboxAsync(Guid.NewGuid().ToString(), GetEventName(eventType, eventData), eventType, eventData, null)) + if (await AddToInboxAsync(Guid.NewGuid().ToString(), EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, null)) { return; } @@ -214,12 +219,30 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen EventData = outgoingEvent.EventData }); - if (!TryResolveStoredEventType(outgoingEvent.EventName, out var eventType)) + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); + if (eventType == null) { - return; + var isAnonymous = AnonymousEventNames.ContainsKey(outgoingEvent.EventName); + if (!isAnonymous) + { + return; + } + + eventType = typeof(AnonymousEventData); + } + + object eventData; + if (eventType == typeof(AnonymousEventData)) + { + eventData = new AnonymousEventData( + outgoingEvent.EventName, + JsonSerializer.Deserialize(outgoingEvent.EventData)!); + } + else + { + eventData = JsonSerializer.Deserialize(outgoingEvent.EventData, eventType)!; } - var eventData = DeserializeStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, eventType); if (await AddToInboxAsync(Guid.NewGuid().ToString(), outgoingEvent.EventName, eventType, eventData, null)) { return; @@ -238,12 +261,30 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventType(incomingEvent.EventName, out var eventType)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + if (eventType == null) { - return; + var isAnonymous = AnonymousEventNames.ContainsKey(incomingEvent.EventName); + if (!isAnonymous) + { + return; + } + + eventType = typeof(AnonymousEventData); + } + + object eventData; + if (eventType == typeof(AnonymousEventData)) + { + eventData = new AnonymousEventData( + incomingEvent.EventName, + JsonSerializer.Deserialize(incomingEvent.EventData)!); + } + else + { + eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType)!; } - var eventData = DeserializeStoredEventData(incomingEvent.EventName, incomingEvent.EventData, eventType); var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { @@ -257,7 +298,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected override byte[] Serialize(object eventData) { - return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(eventData)); + return JsonSerializer.SerializeToUtf8Bytes(eventData); } protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) @@ -283,89 +324,4 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen { return EventTypes.GetOrDefault(eventName); } - - protected override AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - protected override Task? TryPublishTypedByEventNameAsync( - string eventName, - AnonymousEventData anonymousEventData, - bool onUnitOfWorkComplete, - bool useOutbox) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete, useOutbox); - } - - protected override Task PublishAnonymousByEventNameAsync( - AnonymousEventData anonymousEventData, - bool onUnitOfWorkComplete, - bool useOutbox) - { - if (!HasAnonymousEventName(anonymousEventData.EventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); - } - - protected virtual bool TryResolveStoredEventType(string eventName, out Type eventType) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - return true; - } - - if (!HasAnonymousEventName(eventName)) - { - return false; - } - - eventType = typeof(AnonymousEventData); - return true; - } - - protected virtual object DeserializeStoredEventData(string eventName, byte[] eventData, Type eventType) - { - if (eventType == typeof(AnonymousEventData)) - { - return CreateAnonymousEventData(eventName, eventData); - } - - return JsonSerializer.Deserialize(Encoding.UTF8.GetString(eventData), eventType)!; - } - - protected virtual void CleanupAnonymousEventName(string eventName) - { - if (!LocalEventBus.GetAnonymousEventHandlerFactories(eventName).Any()) - { - AnonymousEventNames.TryRemove(eventName, out _); - } - } - - protected virtual bool HasAnonymousEventName(string eventName) - { - if (!AnonymousEventNames.ContainsKey(eventName)) - { - return false; - } - - if (!LocalEventBus.GetAnonymousEventHandlerFactories(eventName).Any()) - { - AnonymousEventNames.TryRemove(eventName, out _); - return false; - } - - return true; - } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs index b4b589d6dd..8c344f5aa7 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs @@ -174,7 +174,9 @@ public abstract class EventBusBase : IEventBus actualEventType.GetGenericArguments().Length == 1 && typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(actualEventType)) { - var resolvedEventData = ResolveActualEventData(eventData, actualEventType); + var resolvedEventData = eventData is AnonymousEventData aed + ? aed.ConvertToTypedObject(actualEventType) + : eventData; var genericArg = actualEventType.GetGenericArguments()[0]; var baseArg = genericArg.GetTypeInfo().BaseType; @@ -207,7 +209,7 @@ public abstract class EventBusBase : IEventBus { if (eventData is AnonymousEventData anonymousEventData && handlerEventType != typeof(AnonymousEventData)) { - return AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, handlerEventType); + return anonymousEventData.ConvertToTypedObject(handlerEventType); } if (handlerEventType == typeof(AnonymousEventData) && eventData is not AnonymousEventData) @@ -218,16 +220,6 @@ public abstract class EventBusBase : IEventBus return eventData; } - protected virtual object ResolveActualEventData(object eventData, Type actualEventType) - { - if (eventData is AnonymousEventData anonymousEventData) - { - return AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, actualEventType); - } - - return eventData; - } - protected void ThrowOriginalExceptions(Type eventType, List exceptions) { if (exceptions.Count == 1) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs index 3281b95bd5..989191528c 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs @@ -60,8 +60,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency /// public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { - var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - handlerFactories.Locking(factories => + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => { if (!handler.IsInFactories(factories)) { @@ -141,33 +140,21 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// @@ -179,21 +166,28 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency /// public override void UnsubscribeAll(string eventName) { - if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + var isAnonymous = AnonymousEventHandlerFactories.ContainsKey(eventName); + if (!isAnonymous) + { + throw new AbpException($"Unknown event name: {eventName}"); + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); } protected override async Task PublishToEventBusAsync(Type eventType, object eventData) @@ -231,11 +225,10 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency { foreach (var factory in handlerFactory.Value) { - var handlerType = GetHandlerType(factory); handlerFactoryList.Add(new Tuple( factory, handlerFactory.Key, - ReflectionHelper.GetAttributesOfMemberOrDeclaringType(handlerType).FirstOrDefault()?.Order ?? 0)); + ReflectionHelper.GetAttributesOfMemberOrDeclaringType(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0)); } } @@ -243,11 +236,10 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency { foreach (var factory in handlerFactory.Value) { - var handlerType = GetHandlerType(factory); handlerFactoryList.Add(new Tuple( factory, typeof(AnonymousEventData), - ReflectionHelper.GetAttributesOfMemberOrDeclaringType(handlerType).FirstOrDefault()?.Order ?? 0)); + ReflectionHelper.GetAttributesOfMemberOrDeclaringType(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0)); } } @@ -268,7 +260,8 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency { foreach (var factory in handlerFactory.Value) { - var handlerType = GetHandlerType(factory); + using var handler = factory.GetHandler(); + var handlerType = handler.EventHandler.GetType(); handlerFactoryList.Add(new Tuple( factory, typeof(AnonymousEventData), @@ -297,88 +290,6 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency return AnonymousEventHandlerFactories.GetOrAdd(eventName, (name) => new List()); } - private bool TryGetAnonymousHandlerFactories(string eventName, out List handlerFactories) - { - return AnonymousEventHandlerFactories.TryGetValue(eventName, out handlerFactories!); - } - - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private static Type GetHandlerType(IEventHandlerFactory factory) - { - if (factory is SingleInstanceHandlerFactory singleInstanceHandlerFactory) - { - return singleInstanceHandlerFactory.HandlerInstance.GetType(); - } - - if (factory is IocEventHandlerFactory iocEventHandlerFactory) - { - return iocEventHandlerFactory.HandlerType; - } - - if (factory is TransientEventHandlerFactory transientEventHandlerFactory) - { - return transientEventHandlerFactory.HandlerType; - } - - using var handler = factory.GetHandler(); - return handler.EventHandler.GetType(); - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousEventHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - - if (!hasHandlers) - { - AnonymousEventHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - - if (isEmpty) - { - AnonymousEventHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { if (handlerEventType == targetEventType) diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs index 673454c6be..5ceb4b1bb5 100644 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Text; using System.Threading.Tasks; using Shouldly; using Volo.Abp.Domain.Entities.Events.Distributed; @@ -22,32 +21,20 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase [Fact] public async Task Should_Call_Handler_AndDispose() { - var handleCount = 0; - var disposeCount = 0; - var factory = new CountingDistributedEventHandlerFactory( - () => handleCount++, - () => disposeCount++); - - using var subscription = DistributedEventBus.Subscribe(typeof(MySimpleEventData), factory); + using var subscription = DistributedEventBus.Subscribe(); await DistributedEventBus.PublishAsync(new MySimpleEventData(1)); await DistributedEventBus.PublishAsync(new MySimpleEventData(2)); await DistributedEventBus.PublishAsync(new MySimpleEventData(3)); - Assert.Equal(3, handleCount); - Assert.Equal(3, disposeCount); + Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount); + Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount); } [Fact] public async Task Should_Handle_Typed_Handler_When_Published_With_EventName() { - var handleCount = 0; - var disposeCount = 0; - var factory = new CountingDistributedEventHandlerFactory( - () => handleCount++, - () => disposeCount++); - - using var subscription = DistributedEventBus.Subscribe(typeof(MySimpleEventData), factory); + using var subscription = DistributedEventBus.Subscribe(); var eventName = EventNameAttribute.GetNameOrDefault(); await DistributedEventBus.PublishAsync(eventName, new MySimpleEventData(1)); @@ -57,8 +44,8 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase }); await DistributedEventBus.PublishAsync(eventName, new { Value = 3 }); - Assert.Equal(3, handleCount); - Assert.Equal(3, disposeCount); + Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount); + Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount); } [Fact] @@ -95,7 +82,7 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => { handleCount++; - AnonymousEventDataConverter.ConvertToLooseObject(d).ShouldNotBeNull(); + d.ConvertToTypedObject().ShouldNotBeNull(); await Task.CompletedTask; }))); @@ -200,15 +187,16 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase } [Fact] - public async Task Should_Ignore_Unknown_Event_Name() + public async Task Should_Throw_For_Unknown_Event_Name() { - await DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }); + await Assert.ThrowsAsync(() => + DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 })); } [Fact] public async Task Should_Convert_AnonymousEventData_To_Typed_Object() { - MySimpleEventData receivedData = null!; + MySimpleEventData? receivedData = null; using var subscription = DistributedEventBus.Subscribe(async (data) => { @@ -223,63 +211,6 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase receivedData.Value.ShouldBe(42); } - [Fact] - public async Task Should_Rehydrate_Anonymous_Event_From_Outbox_Using_Raw_Json() - { - var localDistributedEventBus = GetRequiredService(); - AnonymousEventData receivedData = null!; - var eventName = "MyEvent-" + Guid.NewGuid().ToString("N"); - - using var subscription = localDistributedEventBus.Subscribe(eventName, - new SingleInstanceHandlerFactory(new ActionEventHandler(async data => - { - receivedData = data; - await Task.CompletedTask; - }))); - - var outgoingEvent = new OutgoingEventInfo( - Guid.NewGuid(), - eventName, - Encoding.UTF8.GetBytes("{\"Value\":42}"), - DateTime.UtcNow); - - await localDistributedEventBus.PublishFromOutboxAsync(outgoingEvent, new OutboxConfig("Test") { DatabaseName = "Test" }); - - receivedData.ShouldNotBeNull(); - receivedData.EventName.ShouldBe(eventName); - AnonymousEventDataConverter.GetJsonData(receivedData).ShouldBe("{\"Value\":42}"); - AnonymousEventDataConverter.ConvertToTypedObject(receivedData).Value.ShouldBe(42); - } - - [Fact] - public async Task Should_Rehydrate_Anonymous_Event_From_Inbox_Using_Raw_Json() - { - var localDistributedEventBus = GetRequiredService(); - AnonymousEventData receivedData = null!; - var eventName = "MyEvent-" + Guid.NewGuid().ToString("N"); - - using var subscription = localDistributedEventBus.Subscribe(eventName, - new SingleInstanceHandlerFactory(new ActionEventHandler(async data => - { - receivedData = data; - await Task.CompletedTask; - }))); - - var incomingEvent = new IncomingEventInfo( - Guid.NewGuid(), - Guid.NewGuid().ToString("N"), - eventName, - Encoding.UTF8.GetBytes("\"hello\""), - DateTime.UtcNow); - - await localDistributedEventBus.ProcessFromInboxAsync(incomingEvent, new InboxConfig("Test") { DatabaseName = "Test" }); - - receivedData.ShouldNotBeNull(); - receivedData.EventName.ShouldBe(eventName); - AnonymousEventDataConverter.GetJsonData(receivedData).ShouldBe("\"hello\""); - AnonymousEventDataConverter.ConvertToTypedObject(receivedData).ShouldBe("hello"); - } - [Fact] public async Task Should_Change_TenantId_If_EventData_Is_MultiTenant() { @@ -378,57 +309,4 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase } } - private sealed class CountingDistributedEventHandlerFactory : IEventHandlerFactory - { - private readonly Action _handleAction; - private readonly Action _disposeAction; - - public CountingDistributedEventHandlerFactory(Action handleAction, Action disposeAction) - { - _handleAction = handleAction; - _disposeAction = disposeAction; - } - - public IEventHandlerDisposeWrapper GetHandler() - { - var wasHandled = false; - return new EventHandlerDisposeWrapper( - new CountingDistributedEventHandler( - _handleAction, - () => wasHandled = true), - () => - { - if (wasHandled) - { - _disposeAction(); - } - } - ); - } - - public bool IsInFactories(List handlerFactories) - { - return handlerFactories.Contains(this); - } - } - - private sealed class CountingDistributedEventHandler : IDistributedEventHandler - { - private readonly Action _handleAction; - private readonly Action _markHandled; - - public CountingDistributedEventHandler(Action handleAction, Action markHandled) - { - _handleAction = handleAction; - _markHandled = markHandled; - } - - public Task HandleEventAsync(MySimpleEventData eventData) - { - _markHandled(); - _handleAction(); - return Task.CompletedTask; - } - } - } diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs index dfcd74bed8..7d3e5748e9 100644 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs @@ -48,7 +48,7 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase [Fact] public async Task Should_Convert_Dictionary_To_Typed_Handler() { - MySimpleEventData receivedData = null!; + MySimpleEventData? receivedData = null; using var subscription = LocalEventBus.Subscribe(async (data) => { @@ -118,28 +118,29 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase } [Fact] - public async Task Should_Ignore_Unknown_Event_Name() + public async Task Should_Throw_For_Unknown_Event_Name() { - await LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }); + await Assert.ThrowsAsync(() => + LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 })); } [Fact] public async Task Should_ConvertToTypedObject_In_Anonymous_Handler() { - object receivedData = null!; + object? receivedData = null; var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); using var subscription = LocalEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => { - receivedData = AnonymousEventDataConverter.ConvertToLooseObject(d); + receivedData = d.ConvertToTypedObject(); await Task.CompletedTask; }))); await LocalEventBus.PublishAsync(eventName, new { Name = "Hello", Count = 42 }); receivedData.ShouldNotBeNull(); - var dict = receivedData.ShouldBeOfType>(); + var dict = receivedData.ShouldBeOfType>(); dict["Name"].ShouldBe("Hello"); dict["Count"].ShouldBe(42L); } @@ -147,13 +148,13 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase [Fact] public async Task Should_ConvertToTypedObject_Generic_In_Anonymous_Handler() { - MySimpleEventData receivedData = null!; + MySimpleEventData? receivedData = null; var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); using var subscription = LocalEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => { - receivedData = AnonymousEventDataConverter.ConvertToTypedObject(d); + receivedData = d.ConvertToTypedObject(); await Task.CompletedTask; }))); @@ -162,23 +163,4 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase receivedData.ShouldNotBeNull(); receivedData.Value.ShouldBe(99); } - - [Fact] - public void Should_Roundtrip_Raw_Json_Without_Object_Deserialization() - { - var eventData = AnonymousEventData.FromJson("TestEvent", "{\"Value\":42,\"Name\":\"hello\"}"); - - eventData.EventName.ShouldBe("TestEvent"); - AnonymousEventDataConverter.GetJsonData(eventData).ShouldBe("{\"Value\":42,\"Name\":\"hello\"}"); - AnonymousEventDataConverter.ConvertToTypedObject(eventData).Value.ShouldBe(42); - } - - [Fact] - public void Should_Preserve_String_Payload_As_Raw_Json() - { - var eventData = AnonymousEventData.FromJson("TestEvent", "\"hello\""); - - AnonymousEventDataConverter.GetJsonData(eventData).ShouldBe("\"hello\""); - AnonymousEventDataConverter.ConvertToTypedObject(eventData).ShouldBe("hello"); - } }