diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs index 079ed1287a..f770f5fbb2 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Text.Json; @@ -96,11 +96,30 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule if (IsAbpDaprEventData(data)) { - await TryHandleAbpDaprEnvelopeAsync(distributedEventBus, daprSerializer, data); + var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As(); + var eventType = distributedEventBus.GetEventType(daprEventData.Topic); + if (eventType != null) + { + var eventData = daprSerializer.Deserialize(daprEventData.JsonData, eventType); + await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId); + }else if (distributedEventBus.IsAnonymousEvent(daprEventData.Topic)) + { + var eventData = daprSerializer.Deserialize(daprEventData.JsonData, typeof(object)); + await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(daprEventData.Topic, eventData), daprEventData.MessageId, daprEventData.CorrelationId); + } } else { - await TryHandleDirectDaprEventAsync(distributedEventBus, daprSerializer, topic!, data); + var eventType = distributedEventBus.GetEventType(topic); + if (eventType != null) + { + var eventData = daprSerializer.Deserialize(data, eventType); + await distributedEventBus.TriggerHandlersAsync(eventType, eventData); + }else if (distributedEventBus.IsAnonymousEvent(topic)) + { + var eventData = daprSerializer.Deserialize(data, typeof(object)); + await distributedEventBus.TriggerHandlersAsync(typeof(AnonymousEventData), new AnonymousEventData(topic, eventData)); + } } httpContext.Response.StatusCode = 200; @@ -117,60 +136,4 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule objects.Any(x => x.Name.Equals("JsonData", StringComparison.CurrentCultureIgnoreCase)) && objects.Any(x => x.Name.Equals("CorrelationId", StringComparison.CurrentCultureIgnoreCase)); } - - private static async Task TryHandleAbpDaprEnvelopeAsync( - DaprDistributedEventBus distributedEventBus, - IDaprSerializer daprSerializer, - string data) - { - var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As(); - if (!TryResolveIncomingEvent(distributedEventBus, daprSerializer, daprEventData.Topic, daprEventData.JsonData, out var eventType, out var eventData)) - { - return; - } - - await distributedEventBus.TriggerHandlersAsync(eventType, eventData, daprEventData.MessageId, daprEventData.CorrelationId); - } - - private static async Task TryHandleDirectDaprEventAsync( - DaprDistributedEventBus distributedEventBus, - IDaprSerializer daprSerializer, - string topic, - string data) - { - if (!TryResolveIncomingEvent(distributedEventBus, daprSerializer, topic, data, out var eventType, out var eventData)) - { - return; - } - - await distributedEventBus.TriggerHandlersAsync(eventType, eventData); - } - - private static bool TryResolveIncomingEvent( - DaprDistributedEventBus distributedEventBus, - IDaprSerializer daprSerializer, - string topic, - string data, - out Type eventType, - out object eventData) - { - var typedEventType = distributedEventBus.GetEventType(topic); - if (typedEventType != null) - { - eventType = typedEventType; - eventData = daprSerializer.Deserialize(data, typedEventType); - return true; - } - - if (!distributedEventBus.IsAnonymousEvent(topic)) - { - eventType = default!; - eventData = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = AnonymousEventData.FromJson(topic, data); - return true; - } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs index 255b4cffff..1db318b40e 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventData.cs @@ -1,4 +1,8 @@ using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; + namespace Volo.Abp.EventBus; /// @@ -14,14 +18,11 @@ public class AnonymousEventData public string EventName { get; } /// - /// The raw event data payload. Can be a CLR object, , or any serializable object. + /// The raw event data payload. Can be a CLR object, , or any serializable object. /// - internal object Data { get; } + public object Data { get; } - /// - /// The raw JSON payload when the event is created from transport data. - /// - public string? JsonData { get; } + private JsonElement? _cachedJsonElement; /// /// Creates a new instance of . @@ -35,17 +36,105 @@ public class AnonymousEventData } /// - /// Creates a new instance of from raw JSON. + /// Converts the to a loosely-typed object graph + /// (dictionaries for objects, lists for arrays, primitives for values). /// - public static AnonymousEventData FromJson(string eventName, string jsonData) + /// A CLR object representation of the event data + public object ConvertToTypedObject() { - return new AnonymousEventData(eventName, data: null!, jsonData); + return ConvertElement(GetJsonElement()); } - private AnonymousEventData(string eventName, object data, string? jsonData) + /// + /// Converts the to a strongly-typed object. + /// Returns the data directly if it is already of type , + /// otherwise deserializes from JSON. + /// + /// Target type to convert to + /// The deserialized object of type + /// Thrown when deserialization fails + public T ConvertToTypedObject() { - EventName = eventName; - Data = data; - JsonData = jsonData; + if (Data is T typedData) + { + return typedData; + } + + return GetJsonElement().Deserialize() + ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}."); + } + + /// + /// Converts the to the specified . + /// Returns the data directly if it is already an instance of the target type, + /// otherwise deserializes from JSON. + /// + /// Target type to convert to + /// The deserialized object + /// Thrown when deserialization fails + public object ConvertToTypedObject(Type type) + { + if (type.IsInstanceOfType(Data)) + { + return Data; + } + + return GetJsonElement().Deserialize(type) + ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}."); + } + + private JsonElement GetJsonElement() + { + if (_cachedJsonElement.HasValue) + { + return _cachedJsonElement.Value; + } + + if (Data is JsonElement existingElement) + { + _cachedJsonElement = existingElement; + return existingElement; + } + + _cachedJsonElement = JsonSerializer.SerializeToElement(Data); + return _cachedJsonElement.Value; + } + + private static object ConvertElement(JsonElement element) + { + switch (element.ValueKind) + { + case JsonValueKind.Object: + { + var obj = new Dictionary(); + foreach (var property in element.EnumerateObject()) + { + obj[property.Name] = property.Value.ValueKind == JsonValueKind.Null + ? null + : ConvertElement(property.Value); + } + return obj; + } + case JsonValueKind.Array: + return element.EnumerateArray() + .Select(item => item.ValueKind == JsonValueKind.Null ? null : (object?)ConvertElement(item)) + .ToList(); + case JsonValueKind.String: + return element.GetString()!; + case JsonValueKind.Number when element.TryGetInt64(out var longValue): + return longValue; + case JsonValueKind.Number when element.TryGetDecimal(out var decimalValue): + return decimalValue; + case JsonValueKind.Number when element.TryGetDouble(out var doubleValue): + return doubleValue; + case JsonValueKind.True: + return true; + case JsonValueKind.False: + return false; + case JsonValueKind.Null: + case JsonValueKind.Undefined: + default: + return null!; + } } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs deleted file mode 100644 index e4b9234ccd..0000000000 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/AnonymousEventDataConverter.cs +++ /dev/null @@ -1,95 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text.Json; - -namespace Volo.Abp.EventBus; - -public static class AnonymousEventDataConverter -{ - public static T ConvertToTypedObject(AnonymousEventData eventData) - { - if (eventData.Data is T typedData) - { - return typedData; - } - - return ParseJsonElement(eventData).Deserialize() - ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {typeof(T).FullName}."); - } - - public static object ConvertToTypedObject(AnonymousEventData eventData, Type type) - { - if (type.IsInstanceOfType(eventData.Data)) - { - return eventData.Data; - } - - return ParseJsonElement(eventData).Deserialize(type) - ?? throw new InvalidOperationException($"Failed to deserialize AnonymousEventData to {type.FullName}."); - } - - public static object ConvertToLooseObject(AnonymousEventData eventData) - { - return ConvertElement(ParseJsonElement(eventData)); - } - - public static string GetJsonData(AnonymousEventData eventData) - { - return eventData.JsonData ?? ParseJsonElement(eventData).GetRawText(); - } - - private static JsonElement ParseJsonElement(AnonymousEventData eventData) - { - if (eventData.Data is JsonElement existingElement) - { - return existingElement; - } - - if (eventData.JsonData != null) - { - return JsonDocument.Parse(eventData.JsonData).RootElement.Clone(); - } - - return JsonSerializer.SerializeToElement(eventData.Data); - } - - private static object ConvertElement(JsonElement element) - { - switch (element.ValueKind) - { - case JsonValueKind.Object: - { - var obj = new Dictionary(); - foreach (var property in element.EnumerateObject()) - { - obj[property.Name] = property.Value.ValueKind == JsonValueKind.Null - ? null - : ConvertElement(property.Value); - } - - return obj; - } - case JsonValueKind.Array: - return element.EnumerateArray() - .Select(item => item.ValueKind == JsonValueKind.Null ? null : (object?)ConvertElement(item)) - .ToList(); - case JsonValueKind.String: - return element.GetString()!; - case JsonValueKind.Number when element.TryGetInt64(out var longValue): - return longValue; - case JsonValueKind.Number when element.TryGetDecimal(out var decimalValue): - return decimalValue; - case JsonValueKind.Number when element.TryGetDouble(out var doubleValue): - return doubleValue; - case JsonValueKind.True: - return true; - case JsonValueKind.False: - return false; - case JsonValueKind.Null: - case JsonValueKind.Undefined: - default: - return null!; - } - } -} diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index 83eba73fa3..020757ad07 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Text.Json; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using Microsoft.Extensions.DependencyInjection; @@ -91,9 +92,10 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen { eventData = Serializer.Deserialize(message.Body.ToArray(), eventType); } - else if (HasAnonymousHandlers(eventName)) + else if (AnonymousHandlerFactories.ContainsKey(eventName)) { - eventData = CreateAnonymousEventData(eventName, Serializer.Deserialize(message.Body.ToArray())); + var data = Serializer.Deserialize(message.Body.ToArray()); + eventData = new AnonymousEventData(eventName, data); eventType = typeof(AnonymousEventData); } else @@ -115,21 +117,14 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); - var added = false; - handlerFactories.Locking(factories => - { - if (!factory.IsInFactories(factories)) - { - factories.Add(factory); - added = true; - } - }); - if (!added) + if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(factory); + return new EventHandlerFactoryUnregistrar(this, eventType, factory); } @@ -137,21 +132,14 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - var added = false; - handlerFactories.Locking(factories => - { - if (!handler.IsInFactories(factories)) - { - factories.Add(handler); - added = true; - } - }); - if (!added) + if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(handler); + return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } @@ -211,9 +199,20 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + if (AnonymousHandlerFactories.ContainsKey(eventName)) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + throw new AbpException($"Unknown event name: {eventName}"); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -288,7 +287,20 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + var element = Serializer.Deserialize(incomingEvent.EventData); + eventData = new AnonymousEventData(incomingEvent.EventName, element); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -388,45 +400,29 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// public override void UnsubscribeAll(string eventName) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -452,81 +448,6 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - eventData = Serializer.Deserialize(payload, eventType); - return true; - } - - if (!HasAnonymousHandlers(eventName)) - { - eventData = default!; - eventType = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, payload); - return true; - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - if (!hasHandlers) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - if (isEmpty) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType); diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index 13edc0106d..1231daf632 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -70,21 +70,14 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); - var added = false; - handlerFactories.Locking(factories => - { - if (!factory.IsInFactories(factories)) - { - factories.Add(factory); - added = true; - } - }); - if (!added) + if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(factory); + return new EventHandlerFactoryUnregistrar(this, eventType, factory); } @@ -92,21 +85,14 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - var added = false; - handlerFactories.Locking(factories => - { - if (!handler.IsInFactories(factories)) - { - factories.Add(handler); - added = true; - } - }); - if (!added) + if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(handler); + return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } @@ -163,9 +149,20 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + if (AnonymousHandlerFactories.ContainsKey(eventName)) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + throw new AbpException($"Unknown event name: {eventName}"); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -181,7 +178,18 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - if (!TryResolveStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName)) + { + eventData = Serializer.Deserialize(outgoingEvent.EventData, typeof(object)); + } + else { return; } @@ -222,7 +230,19 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object))); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -307,51 +327,33 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public bool IsAnonymousEvent(string eventName) { - return HasAnonymousHandlers(eventName); + return AnonymousHandlerFactories.ContainsKey(eventName); } /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// public override void UnsubscribeAll(string eventName) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -377,81 +379,6 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - eventData = Serializer.Deserialize(payload, eventType); - return true; - } - - if (!HasAnonymousHandlers(eventName)) - { - eventData = default!; - eventType = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, payload); - return true; - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - if (!hasHandlers) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - if (isEmpty) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index be82c603c1..e7e44e449d 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Text.Json; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.DependencyInjection; @@ -91,9 +92,10 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen { eventData = Serializer.Deserialize(message.Value, eventType); } - else if (HasAnonymousHandlers(eventName)) + else if (AnonymousHandlerFactories.ContainsKey(eventName)) { - eventData = CreateAnonymousEventData(eventName, message.Value); + var element = Serializer.Deserialize(message.Value); + eventData = new AnonymousEventData(eventName, element); eventType = typeof(AnonymousEventData); } else @@ -115,21 +117,14 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); - var added = false; - handlerFactories.Locking(factories => - { - if (!factory.IsInFactories(factories)) - { - factories.Add(factory); - added = true; - } - }); - if (!added) + if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(factory); + return new EventHandlerFactoryUnregistrar(this, eventType, factory); } @@ -137,21 +132,14 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - var added = false; - handlerFactories.Locking(factories => - { - if (!handler.IsInFactories(factories)) - { - factories.Add(handler); - added = true; - } - }); - if (!added) + if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } + handlerFactories.Add(handler); + return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } @@ -212,9 +200,20 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + if (AnonymousHandlerFactories.ContainsKey(eventName)) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + throw new AbpException($"Unknown event name: {eventName}"); } protected override async Task PublishToEventBusAsync(Type eventType, object eventData) @@ -326,7 +325,20 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + var element = Serializer.Deserialize(incomingEvent.EventData); + eventData = new AnonymousEventData(incomingEvent.EventName, element); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -424,45 +436,27 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// public override void UnsubscribeAll(string eventName) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -488,81 +482,6 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - eventData = Serializer.Deserialize(payload, eventType); - return true; - } - - if (!HasAnonymousHandlers(eventName)) - { - eventData = default!; - eventType = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, payload); - return true; - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - if (!hasHandlers) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - if (isEmpty) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index 097f2cbbe7..771dae9f09 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -109,10 +109,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis { eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); } - else if (HasAnonymousHandlers(eventName)) + else if (AnonymousHandlerFactories.ContainsKey(eventName)) { eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, ea.Body.ToArray()); + eventData = new AnonymousEventData(eventName, Serializer.Deserialize(ea.Body.ToArray())); } else { @@ -134,24 +134,15 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); - var added = false; - var isFirstHandler = false; - handlerFactories.Locking(factories => - { - if (!factory.IsInFactories(factories)) - { - isFirstHandler = factories.Count == 0; - factories.Add(factory); - added = true; - } - }); - if (!added) + if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } - if (isFirstHandler) + handlerFactories.Add(factory); + + if (handlerFactories.Count == 1) //TODO: Multi-threading! { Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); } @@ -163,24 +154,15 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - var added = false; - var isFirstHandler = false; - handlerFactories.Locking(factories => - { - if (!handler.IsInFactories(factories)) - { - isFirstHandler = factories.Count == 0; - factories.Add(handler); - added = true; - } - }); - - if (!added) + + if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } - - if (isFirstHandler) + + handlerFactories.Add(handler); + + if (handlerFactories.Count == 1) //TODO: Multi-threading! { Consumer.BindAsync(eventName); } @@ -245,9 +227,20 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + if (AnonymousHandlerFactories.ContainsKey(eventName)) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + throw new AbpException($"Unknown event name: {eventName}"); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -312,7 +305,19 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData)); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -482,45 +487,27 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// public override void UnsubscribeAll(string eventName) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -546,81 +533,6 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - eventData = Serializer.Deserialize(payload, eventType); - return true; - } - - if (!HasAnonymousHandlers(eventName)) - { - eventData = default!; - eventType = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, payload); - return true; - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - if (!hasHandlers) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - if (isEmpty) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index bf1bd1a585..c4f1a141e6 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -100,24 +100,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); - var added = false; - var isFirstHandler = false; - handlerFactories.Locking(factories => - { - if (!factory.IsInFactories(factories)) - { - isFirstHandler = factories.Count == 0; - factories.Add(factory); - added = true; - } - }); - if (!added) + if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } - if (isFirstHandler) + handlerFactories.Add(factory); + + if (handlerFactories.Count == 1) //TODO: Multi-threading! { Rebus.Subscribe(eventType); } @@ -129,24 +120,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - var added = false; - var isFirstHandler = false; - handlerFactories.Locking(factories => - { - if (!handler.IsInFactories(factories)) - { - isFirstHandler = factories.Count == 0; - factories.Add(handler); - added = true; - } - }); - if (!added) + if (handler.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } - if (isFirstHandler && AnonymousHandlerFactories.Count == 1) + handlerFactories.Add(handler); + + if (AnonymousHandlerFactories.Count == 1) //TODO: Multi-threading! { Rebus.Subscribe(typeof(AnonymousEventData)); } @@ -211,9 +193,20 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + if (AnonymousHandlerFactories.ContainsKey(eventName)) + { + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); + } + + throw new AbpException($"Unknown event name: {eventName}"); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -235,7 +228,19 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - if (!TryResolveStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName)) + { + eventData = new AnonymousEventData(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, typeof(object))); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -268,6 +273,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen { await PublishFromOutboxAsync(outgoingEvent, outboxConfig); + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) + { + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } } await scope.CompleteAsync(); @@ -278,7 +292,19 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventData(incomingEvent.EventName, incomingEvent.EventData, out var eventType, out var eventData)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (AnonymousHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + eventData = new AnonymousEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object))); + eventType = typeof(AnonymousEventData); + } + else { return; } @@ -367,45 +393,27 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// public override void UnsubscribeAll(string eventName) { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } protected override IEnumerable GetAnonymousHandlerFactories(string eventName) @@ -431,81 +439,6 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen return AnonymousHandlerFactories.GetOrAdd(eventName, _ => new List()); } - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private bool TryResolveStoredEventData(string eventName, byte[] payload, out Type eventType, out object eventData) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - eventData = Serializer.Deserialize(payload, eventType); - return true; - } - - if (!HasAnonymousHandlers(eventName)) - { - eventData = default!; - eventType = default!; - return false; - } - - eventType = typeof(AnonymousEventData); - eventData = CreateAnonymousEventData(eventName, payload); - return true; - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - if (!hasHandlers) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - if (isEmpty) - { - AnonymousHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index a5ffb87245..88ce5662f8 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -113,9 +112,15 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB bool onUnitOfWorkComplete = true, bool useOutbox = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete, useOutbox) - ?? PublishAnonymousByEventNameAsync(anonymousEventData, onUnitOfWorkComplete, useOutbox); + var eventType = GetEventTypeByEventName(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox); + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); } public abstract Task PublishFromOutboxAsync( @@ -147,14 +152,14 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB if (outboxConfig.Selector == null || outboxConfig.Selector(eventType)) { var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); - var (eventName, resolvedEventData) = ResolveEventForPublishing(eventType, eventData); + (var eventName, eventData) = ResolveEventForPublishing(eventType, eventData); - await OnAddToOutboxAsync(eventName, eventType, resolvedEventData); + await OnAddToOutboxAsync(eventName, eventType, eventData); var outgoingEventInfo = new OutgoingEventInfo( GuidGenerator.Create(), eventName, - SerializeEventData(resolvedEventData), + Serialize(eventData), Clock.Now ); @@ -209,11 +214,13 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB } } + eventData = GetEventData(eventData); + var incomingEventInfo = new IncomingEventInfo( GuidGenerator.Create(), messageId!, eventName, - SerializeEventData(eventData), + Serialize(eventData), Clock.Now ); incomingEventInfo.SetCorrelationId(correlationId!); @@ -228,55 +235,6 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB protected abstract byte[] Serialize(object eventData); - protected virtual byte[] SerializeEventData(object eventData) - { - if (eventData is AnonymousEventData anonymousEventData) - { - return Encoding.UTF8.GetBytes(AnonymousEventDataConverter.GetJsonData(anonymousEventData)); - } - - return Serialize(eventData); - } - - protected virtual AnonymousEventData CreateAnonymousEventData(string eventName, byte[] eventData) - { - return AnonymousEventData.FromJson(eventName, Encoding.UTF8.GetString(eventData)); - } - - protected virtual AnonymousEventData CreateAnonymousEventData(string eventName, string eventData) - { - return AnonymousEventData.FromJson(eventName, eventData); - } - - protected virtual AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - protected virtual Task? TryPublishTypedByEventNameAsync( - string eventName, - AnonymousEventData anonymousEventData, - bool onUnitOfWorkComplete, - bool useOutbox) - { - var eventType = GetEventTypeByEventName(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete, useOutbox); - } - - protected virtual Task PublishAnonymousByEventNameAsync( - AnonymousEventData anonymousEventData, - bool onUnitOfWorkComplete, - bool useOutbox) - { - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); - } - protected virtual async Task TriggerHandlersDirectAsync(Type eventType, object eventData) { await TriggerDistributedEventReceivedAsync(new DistributedEventReceived @@ -339,7 +297,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB { if (eventData is AnonymousEventData anonymousEventData) { - return AnonymousEventDataConverter.ConvertToLooseObject(anonymousEventData); + return anonymousEventData.Data; } return eventData; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs index ede70885ff..c78b30260e 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs @@ -3,7 +3,6 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; -using System.Text; using System.Text.Json; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; @@ -77,8 +76,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { AnonymousEventNames.GetOrAdd(eventName, true); - LocalEventBus.Subscribe(eventName, handler); - return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); + return LocalEventBus.Subscribe(eventName, handler); } /// @@ -86,8 +84,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen { var eventName = EventNameAttribute.GetNameOrDefault(eventType); EventTypes.GetOrAdd(eventName, eventType); - LocalEventBus.Subscribe(eventType, factory); - return new EventHandlerFactoryUnregistrar(this, eventType, factory); + return LocalEventBus.Subscribe(eventType, factory); } public override void Unsubscribe(Func action) @@ -109,14 +106,12 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { LocalEventBus.Unsubscribe(eventName, factory); - CleanupAnonymousEventName(eventName); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { LocalEventBus.Unsubscribe(eventName, handler); - CleanupAnonymousEventName(eventName); } /// @@ -129,7 +124,6 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public override void UnsubscribeAll(string eventName) { LocalEventBus.UnsubscribeAll(eventName); - CleanupAnonymousEventName(eventName); } /// @@ -178,14 +172,25 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete, useOutbox) - ?? PublishAnonymousByEventNameAsync(anonymousEventData, onUnitOfWorkComplete, useOutbox); + var eventType = EventTypes.GetOrDefault(eventName); + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete, useOutbox); + } + + if (!AnonymousEventNames.ContainsKey(eventName)) + { + throw new AbpException($"Unknown event name: {eventName}"); + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - if (await AddToInboxAsync(Guid.NewGuid().ToString(), GetEventName(eventType, eventData), eventType, eventData, null)) + if (await AddToInboxAsync(Guid.NewGuid().ToString(), EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, null)) { return; } @@ -214,12 +219,30 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen EventData = outgoingEvent.EventData }); - if (!TryResolveStoredEventType(outgoingEvent.EventName, out var eventType)) + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); + if (eventType == null) { - return; + var isAnonymous = AnonymousEventNames.ContainsKey(outgoingEvent.EventName); + if (!isAnonymous) + { + return; + } + + eventType = typeof(AnonymousEventData); + } + + object eventData; + if (eventType == typeof(AnonymousEventData)) + { + eventData = new AnonymousEventData( + outgoingEvent.EventName, + JsonSerializer.Deserialize(outgoingEvent.EventData)!); + } + else + { + eventData = JsonSerializer.Deserialize(outgoingEvent.EventData, eventType)!; } - var eventData = DeserializeStoredEventData(outgoingEvent.EventName, outgoingEvent.EventData, eventType); if (await AddToInboxAsync(Guid.NewGuid().ToString(), outgoingEvent.EventName, eventType, eventData, null)) { return; @@ -238,12 +261,30 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - if (!TryResolveStoredEventType(incomingEvent.EventName, out var eventType)) + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + if (eventType == null) { - return; + var isAnonymous = AnonymousEventNames.ContainsKey(incomingEvent.EventName); + if (!isAnonymous) + { + return; + } + + eventType = typeof(AnonymousEventData); + } + + object eventData; + if (eventType == typeof(AnonymousEventData)) + { + eventData = new AnonymousEventData( + incomingEvent.EventName, + JsonSerializer.Deserialize(incomingEvent.EventData)!); + } + else + { + eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType)!; } - var eventData = DeserializeStoredEventData(incomingEvent.EventName, incomingEvent.EventData, eventType); var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { @@ -257,7 +298,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected override byte[] Serialize(object eventData) { - return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(eventData)); + return JsonSerializer.SerializeToUtf8Bytes(eventData); } protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) @@ -283,89 +324,4 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen { return EventTypes.GetOrDefault(eventName); } - - protected override AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - protected override Task? TryPublishTypedByEventNameAsync( - string eventName, - AnonymousEventData anonymousEventData, - bool onUnitOfWorkComplete, - bool useOutbox) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete, useOutbox); - } - - protected override Task PublishAnonymousByEventNameAsync( - AnonymousEventData anonymousEventData, - bool onUnitOfWorkComplete, - bool useOutbox) - { - if (!HasAnonymousEventName(anonymousEventData.EventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete, useOutbox); - } - - protected virtual bool TryResolveStoredEventType(string eventName, out Type eventType) - { - eventType = EventTypes.GetOrDefault(eventName)!; - if (eventType != null) - { - return true; - } - - if (!HasAnonymousEventName(eventName)) - { - return false; - } - - eventType = typeof(AnonymousEventData); - return true; - } - - protected virtual object DeserializeStoredEventData(string eventName, byte[] eventData, Type eventType) - { - if (eventType == typeof(AnonymousEventData)) - { - return CreateAnonymousEventData(eventName, eventData); - } - - return JsonSerializer.Deserialize(Encoding.UTF8.GetString(eventData), eventType)!; - } - - protected virtual void CleanupAnonymousEventName(string eventName) - { - if (!LocalEventBus.GetAnonymousEventHandlerFactories(eventName).Any()) - { - AnonymousEventNames.TryRemove(eventName, out _); - } - } - - protected virtual bool HasAnonymousEventName(string eventName) - { - if (!AnonymousEventNames.ContainsKey(eventName)) - { - return false; - } - - if (!LocalEventBus.GetAnonymousEventHandlerFactories(eventName).Any()) - { - AnonymousEventNames.TryRemove(eventName, out _); - return false; - } - - return true; - } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs index b4b589d6dd..8c344f5aa7 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs @@ -174,7 +174,9 @@ public abstract class EventBusBase : IEventBus actualEventType.GetGenericArguments().Length == 1 && typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(actualEventType)) { - var resolvedEventData = ResolveActualEventData(eventData, actualEventType); + var resolvedEventData = eventData is AnonymousEventData aed + ? aed.ConvertToTypedObject(actualEventType) + : eventData; var genericArg = actualEventType.GetGenericArguments()[0]; var baseArg = genericArg.GetTypeInfo().BaseType; @@ -207,7 +209,7 @@ public abstract class EventBusBase : IEventBus { if (eventData is AnonymousEventData anonymousEventData && handlerEventType != typeof(AnonymousEventData)) { - return AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, handlerEventType); + return anonymousEventData.ConvertToTypedObject(handlerEventType); } if (handlerEventType == typeof(AnonymousEventData) && eventData is not AnonymousEventData) @@ -218,16 +220,6 @@ public abstract class EventBusBase : IEventBus return eventData; } - protected virtual object ResolveActualEventData(object eventData, Type actualEventType) - { - if (eventData is AnonymousEventData anonymousEventData) - { - return AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, actualEventType); - } - - return eventData; - } - protected void ThrowOriginalExceptions(Type eventType, List exceptions) { if (exceptions.Count == 1) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs index 3281b95bd5..989191528c 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs @@ -60,8 +60,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency /// public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) { - var handlerFactories = GetOrCreateAnonymousHandlerFactories(eventName); - handlerFactories.Locking(factories => + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => { if (!handler.IsInFactories(factories)) { @@ -141,33 +140,21 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency /// public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Remove(factory)); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); } /// public override void Unsubscribe(string eventName, IEventHandler handler) { - if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory singleFactory && - singleFactory.HandlerInstance == handler - ); - }); - - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); } /// @@ -179,21 +166,28 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency /// public override void UnsubscribeAll(string eventName) { - if (!TryGetAnonymousHandlerFactories(eventName, out var handlerFactories)) - { - return; - } - - handlerFactories.Locking(factories => factories.Clear()); - CleanupAnonymousHandlerFactoriesIfEmpty(eventName, handlerFactories); + GetOrCreateAnonymousHandlerFactories(eventName).Locking(factories => factories.Clear()); } /// public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var anonymousEventData = CreateAnonymousEnvelope(eventName, eventData); - return TryPublishTypedByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete) - ?? PublishAnonymousByEventNameAsync(eventName, anonymousEventData, onUnitOfWorkComplete); + var eventType = EventTypes.GetOrDefault(eventName); + + var anonymousEventData = eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, anonymousEventData.ConvertToTypedObject(eventType), onUnitOfWorkComplete); + } + + var isAnonymous = AnonymousEventHandlerFactories.ContainsKey(eventName); + if (!isAnonymous) + { + throw new AbpException($"Unknown event name: {eventName}"); + } + + return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); } protected override async Task PublishToEventBusAsync(Type eventType, object eventData) @@ -231,11 +225,10 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency { foreach (var factory in handlerFactory.Value) { - var handlerType = GetHandlerType(factory); handlerFactoryList.Add(new Tuple( factory, handlerFactory.Key, - ReflectionHelper.GetAttributesOfMemberOrDeclaringType(handlerType).FirstOrDefault()?.Order ?? 0)); + ReflectionHelper.GetAttributesOfMemberOrDeclaringType(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0)); } } @@ -243,11 +236,10 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency { foreach (var factory in handlerFactory.Value) { - var handlerType = GetHandlerType(factory); handlerFactoryList.Add(new Tuple( factory, typeof(AnonymousEventData), - ReflectionHelper.GetAttributesOfMemberOrDeclaringType(handlerType).FirstOrDefault()?.Order ?? 0)); + ReflectionHelper.GetAttributesOfMemberOrDeclaringType(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0)); } } @@ -268,7 +260,8 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency { foreach (var factory in handlerFactory.Value) { - var handlerType = GetHandlerType(factory); + using var handler = factory.GetHandler(); + var handlerType = handler.EventHandler.GetType(); handlerFactoryList.Add(new Tuple( factory, typeof(AnonymousEventData), @@ -297,88 +290,6 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency return AnonymousEventHandlerFactories.GetOrAdd(eventName, (name) => new List()); } - private bool TryGetAnonymousHandlerFactories(string eventName, out List handlerFactories) - { - return AnonymousEventHandlerFactories.TryGetValue(eventName, out handlerFactories!); - } - - private AnonymousEventData CreateAnonymousEnvelope(string eventName, object eventData) - { - return eventData as AnonymousEventData ?? new AnonymousEventData(eventName, eventData); - } - - private Task? TryPublishTypedByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return null; - } - - var typedEventData = AnonymousEventDataConverter.ConvertToTypedObject(anonymousEventData, eventType); - return PublishAsync(eventType, typedEventData, onUnitOfWorkComplete); - } - - private Task PublishAnonymousByEventNameAsync(string eventName, AnonymousEventData anonymousEventData, bool onUnitOfWorkComplete) - { - if (!HasAnonymousHandlers(eventName)) - { - return Task.CompletedTask; - } - - return PublishAsync(typeof(AnonymousEventData), anonymousEventData, onUnitOfWorkComplete); - } - - private static Type GetHandlerType(IEventHandlerFactory factory) - { - if (factory is SingleInstanceHandlerFactory singleInstanceHandlerFactory) - { - return singleInstanceHandlerFactory.HandlerInstance.GetType(); - } - - if (factory is IocEventHandlerFactory iocEventHandlerFactory) - { - return iocEventHandlerFactory.HandlerType; - } - - if (factory is TransientEventHandlerFactory transientEventHandlerFactory) - { - return transientEventHandlerFactory.HandlerType; - } - - using var handler = factory.GetHandler(); - return handler.EventHandler.GetType(); - } - - private bool HasAnonymousHandlers(string eventName) - { - if (!AnonymousEventHandlerFactories.TryGetValue(eventName, out var handlerFactories)) - { - return false; - } - - var hasHandlers = false; - handlerFactories.Locking(factories => hasHandlers = factories.Count > 0); - - if (!hasHandlers) - { - AnonymousEventHandlerFactories.TryRemove(eventName, out _); - } - - return hasHandlers; - } - - private void CleanupAnonymousHandlerFactoriesIfEmpty(string eventName, List handlerFactories) - { - var isEmpty = false; - handlerFactories.Locking(factories => isEmpty = factories.Count == 0); - - if (isEmpty) - { - AnonymousEventHandlerFactories.TryRemove(eventName, out _); - } - } - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { if (handlerEventType == targetEventType) diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs index 673454c6be..5ceb4b1bb5 100644 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Text; using System.Threading.Tasks; using Shouldly; using Volo.Abp.Domain.Entities.Events.Distributed; @@ -22,32 +21,20 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase [Fact] public async Task Should_Call_Handler_AndDispose() { - var handleCount = 0; - var disposeCount = 0; - var factory = new CountingDistributedEventHandlerFactory( - () => handleCount++, - () => disposeCount++); - - using var subscription = DistributedEventBus.Subscribe(typeof(MySimpleEventData), factory); + using var subscription = DistributedEventBus.Subscribe(); await DistributedEventBus.PublishAsync(new MySimpleEventData(1)); await DistributedEventBus.PublishAsync(new MySimpleEventData(2)); await DistributedEventBus.PublishAsync(new MySimpleEventData(3)); - Assert.Equal(3, handleCount); - Assert.Equal(3, disposeCount); + Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount); + Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount); } [Fact] public async Task Should_Handle_Typed_Handler_When_Published_With_EventName() { - var handleCount = 0; - var disposeCount = 0; - var factory = new CountingDistributedEventHandlerFactory( - () => handleCount++, - () => disposeCount++); - - using var subscription = DistributedEventBus.Subscribe(typeof(MySimpleEventData), factory); + using var subscription = DistributedEventBus.Subscribe(); var eventName = EventNameAttribute.GetNameOrDefault(); await DistributedEventBus.PublishAsync(eventName, new MySimpleEventData(1)); @@ -57,8 +44,8 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase }); await DistributedEventBus.PublishAsync(eventName, new { Value = 3 }); - Assert.Equal(3, handleCount); - Assert.Equal(3, disposeCount); + Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount); + Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount); } [Fact] @@ -95,7 +82,7 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => { handleCount++; - AnonymousEventDataConverter.ConvertToLooseObject(d).ShouldNotBeNull(); + d.ConvertToTypedObject().ShouldNotBeNull(); await Task.CompletedTask; }))); @@ -200,15 +187,16 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase } [Fact] - public async Task Should_Ignore_Unknown_Event_Name() + public async Task Should_Throw_For_Unknown_Event_Name() { - await DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }); + await Assert.ThrowsAsync(() => + DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 })); } [Fact] public async Task Should_Convert_AnonymousEventData_To_Typed_Object() { - MySimpleEventData receivedData = null!; + MySimpleEventData? receivedData = null; using var subscription = DistributedEventBus.Subscribe(async (data) => { @@ -223,63 +211,6 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase receivedData.Value.ShouldBe(42); } - [Fact] - public async Task Should_Rehydrate_Anonymous_Event_From_Outbox_Using_Raw_Json() - { - var localDistributedEventBus = GetRequiredService(); - AnonymousEventData receivedData = null!; - var eventName = "MyEvent-" + Guid.NewGuid().ToString("N"); - - using var subscription = localDistributedEventBus.Subscribe(eventName, - new SingleInstanceHandlerFactory(new ActionEventHandler(async data => - { - receivedData = data; - await Task.CompletedTask; - }))); - - var outgoingEvent = new OutgoingEventInfo( - Guid.NewGuid(), - eventName, - Encoding.UTF8.GetBytes("{\"Value\":42}"), - DateTime.UtcNow); - - await localDistributedEventBus.PublishFromOutboxAsync(outgoingEvent, new OutboxConfig("Test") { DatabaseName = "Test" }); - - receivedData.ShouldNotBeNull(); - receivedData.EventName.ShouldBe(eventName); - AnonymousEventDataConverter.GetJsonData(receivedData).ShouldBe("{\"Value\":42}"); - AnonymousEventDataConverter.ConvertToTypedObject(receivedData).Value.ShouldBe(42); - } - - [Fact] - public async Task Should_Rehydrate_Anonymous_Event_From_Inbox_Using_Raw_Json() - { - var localDistributedEventBus = GetRequiredService(); - AnonymousEventData receivedData = null!; - var eventName = "MyEvent-" + Guid.NewGuid().ToString("N"); - - using var subscription = localDistributedEventBus.Subscribe(eventName, - new SingleInstanceHandlerFactory(new ActionEventHandler(async data => - { - receivedData = data; - await Task.CompletedTask; - }))); - - var incomingEvent = new IncomingEventInfo( - Guid.NewGuid(), - Guid.NewGuid().ToString("N"), - eventName, - Encoding.UTF8.GetBytes("\"hello\""), - DateTime.UtcNow); - - await localDistributedEventBus.ProcessFromInboxAsync(incomingEvent, new InboxConfig("Test") { DatabaseName = "Test" }); - - receivedData.ShouldNotBeNull(); - receivedData.EventName.ShouldBe(eventName); - AnonymousEventDataConverter.GetJsonData(receivedData).ShouldBe("\"hello\""); - AnonymousEventDataConverter.ConvertToTypedObject(receivedData).ShouldBe("hello"); - } - [Fact] public async Task Should_Change_TenantId_If_EventData_Is_MultiTenant() { @@ -378,57 +309,4 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase } } - private sealed class CountingDistributedEventHandlerFactory : IEventHandlerFactory - { - private readonly Action _handleAction; - private readonly Action _disposeAction; - - public CountingDistributedEventHandlerFactory(Action handleAction, Action disposeAction) - { - _handleAction = handleAction; - _disposeAction = disposeAction; - } - - public IEventHandlerDisposeWrapper GetHandler() - { - var wasHandled = false; - return new EventHandlerDisposeWrapper( - new CountingDistributedEventHandler( - _handleAction, - () => wasHandled = true), - () => - { - if (wasHandled) - { - _disposeAction(); - } - } - ); - } - - public bool IsInFactories(List handlerFactories) - { - return handlerFactories.Contains(this); - } - } - - private sealed class CountingDistributedEventHandler : IDistributedEventHandler - { - private readonly Action _handleAction; - private readonly Action _markHandled; - - public CountingDistributedEventHandler(Action handleAction, Action markHandled) - { - _handleAction = handleAction; - _markHandled = markHandled; - } - - public Task HandleEventAsync(MySimpleEventData eventData) - { - _markHandled(); - _handleAction(); - return Task.CompletedTask; - } - } - } diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs index dfcd74bed8..7d3e5748e9 100644 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Anonymous_Test.cs @@ -48,7 +48,7 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase [Fact] public async Task Should_Convert_Dictionary_To_Typed_Handler() { - MySimpleEventData receivedData = null!; + MySimpleEventData? receivedData = null; using var subscription = LocalEventBus.Subscribe(async (data) => { @@ -118,28 +118,29 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase } [Fact] - public async Task Should_Ignore_Unknown_Event_Name() + public async Task Should_Throw_For_Unknown_Event_Name() { - await LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }); + await Assert.ThrowsAsync(() => + LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 })); } [Fact] public async Task Should_ConvertToTypedObject_In_Anonymous_Handler() { - object receivedData = null!; + object? receivedData = null; var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); using var subscription = LocalEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => { - receivedData = AnonymousEventDataConverter.ConvertToLooseObject(d); + receivedData = d.ConvertToTypedObject(); await Task.CompletedTask; }))); await LocalEventBus.PublishAsync(eventName, new { Name = "Hello", Count = 42 }); receivedData.ShouldNotBeNull(); - var dict = receivedData.ShouldBeOfType>(); + var dict = receivedData.ShouldBeOfType>(); dict["Name"].ShouldBe("Hello"); dict["Count"].ShouldBe(42L); } @@ -147,13 +148,13 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase [Fact] public async Task Should_ConvertToTypedObject_Generic_In_Anonymous_Handler() { - MySimpleEventData receivedData = null!; + MySimpleEventData? receivedData = null; var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); using var subscription = LocalEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => { - receivedData = AnonymousEventDataConverter.ConvertToTypedObject(d); + receivedData = d.ConvertToTypedObject(); await Task.CompletedTask; }))); @@ -162,23 +163,4 @@ public class LocalEventBus_Anonymous_Test : EventBusTestBase receivedData.ShouldNotBeNull(); receivedData.Value.ShouldBe(99); } - - [Fact] - public void Should_Roundtrip_Raw_Json_Without_Object_Deserialization() - { - var eventData = AnonymousEventData.FromJson("TestEvent", "{\"Value\":42,\"Name\":\"hello\"}"); - - eventData.EventName.ShouldBe("TestEvent"); - AnonymousEventDataConverter.GetJsonData(eventData).ShouldBe("{\"Value\":42,\"Name\":\"hello\"}"); - AnonymousEventDataConverter.ConvertToTypedObject(eventData).Value.ShouldBe(42); - } - - [Fact] - public void Should_Preserve_String_Payload_As_Raw_Json() - { - var eventData = AnonymousEventData.FromJson("TestEvent", "\"hello\""); - - AnonymousEventDataConverter.GetJsonData(eventData).ShouldBe("\"hello\""); - AnonymousEventDataConverter.ConvertToTypedObject(eventData).ShouldBe("hello"); - } }