diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index 100d2d9942..8279a412a5 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -22,13 +22,13 @@ namespace Volo.Abp.EventBus.Azure; [ExposeServices(typeof(IDistributedEventBus), typeof(AzureDistributedEventBus))] public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDependency { - private readonly AbpAzureEventBusOptions _options; - private readonly IAzureServiceBusMessageConsumerFactory _messageConsumerFactory; - private readonly IPublisherPool _publisherPool; - private readonly IAzureServiceBusSerializer _serializer; - private readonly ConcurrentDictionary> _handlerFactories; - private readonly ConcurrentDictionary _eventTypes; - private IAzureServiceBusMessageConsumer _consumer; + protected AbpAzureEventBusOptions Options { get; } + protected IAzureServiceBusMessageConsumerFactory MessageConsumerFactory { get; } + protected IPublisherPool PublisherPool { get; } + protected IAzureServiceBusSerializer Serializer { get; } + protected ConcurrentDictionary> HandlerFactories { get; } + protected ConcurrentDictionary EventTypes { get; } + protected IAzureServiceBusMessageConsumer Consumer { get; private set; } public AzureDistributedEventBus( IServiceScopeFactory serviceScopeFactory, @@ -52,22 +52,22 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen eventHandlerInvoker, localEventBus) { - _options = abpAzureEventBusOptions.Value; - _serializer = serializer; - _messageConsumerFactory = messageConsumerFactory; - _publisherPool = publisherPool; - _handlerFactories = new ConcurrentDictionary>(); - _eventTypes = new ConcurrentDictionary(); + Options = abpAzureEventBusOptions.Value; + Serializer = serializer; + MessageConsumerFactory = messageConsumerFactory; + PublisherPool = publisherPool; + HandlerFactories = new ConcurrentDictionary>(); + EventTypes = new ConcurrentDictionary(); } public void Initialize() { - _consumer = _messageConsumerFactory.CreateMessageConsumer( - _options.TopicName, - _options.SubscriberName, - _options.ConnectionName); + Consumer = MessageConsumerFactory.CreateMessageConsumer( + Options.TopicName, + Options.SubscriberName, + Options.ConnectionName); - _consumer.OnMessageReceived(ProcessEventAsync); + Consumer.OnMessageReceived(ProcessEventAsync); SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); } @@ -78,13 +78,13 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen { return; } - var eventType = _eventTypes.GetOrDefault(eventName); + var eventType = EventTypes.GetOrDefault(eventName); if (eventType == null) { return; } - var eventData = _serializer.Deserialize(message.Body.ToArray(), eventType); + var eventData = Serializer.Deserialize(message.Body.ToArray(), eventType); if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData)) { @@ -110,9 +110,9 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen { var outgoingEventArray = outgoingEvents.ToArray(); - var publisher = await _publisherPool.GetAsync( - _options.TopicName, - _options.ConnectionName); + var publisher = await PublisherPool.GetAsync( + Options.TopicName, + Options.ConnectionName); using var messageBatch = await publisher.CreateMessageBatchAsync(); @@ -144,13 +144,13 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - var eventType = _eventTypes.GetOrDefault(incomingEvent.EventName); + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); if (eventType == null) { return; } - var eventData = _serializer.Deserialize(incomingEvent.EventData, eventType); + var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); if (exceptions.Any()) @@ -161,7 +161,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected override byte[] Serialize(object eventData) { - return _serializer.Serialize(eventData); + return Serializer.Serialize(eventData); } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) @@ -242,7 +242,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected virtual Task PublishAsync(string eventName, object eventData) { - var body = _serializer.Serialize(eventData); + var body = Serializer.Serialize(eventData); return PublishAsync(eventName, body, null); } @@ -262,16 +262,16 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen message.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N"); } - var publisher = await _publisherPool.GetAsync( - _options.TopicName, - _options.ConnectionName); + var publisher = await PublisherPool.GetAsync( + Options.TopicName, + Options.ConnectionName); await publisher.SendMessageAsync(message); } protected override IEnumerable GetHandlerFactories(Type eventType) { - return _handlerFactories + return HandlerFactories .Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)) .Select(handlerFactory => new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)) @@ -283,14 +283,20 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType); } + protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) + { + EventTypes.GetOrAdd(eventName, eventType); + return base.OnAddToOutboxAsync(eventName, eventType, eventData); + } + private List GetOrCreateHandlerFactories(Type eventType) { - return _handlerFactories.GetOrAdd( + return HandlerFactories.GetOrAdd( eventType, type => { var eventName = EventNameAttribute.GetNameOrDefault(type); - _eventTypes[eventName] = type; + EventTypes.GetOrAdd(eventName, eventType); return new List(); } ); diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index 28077ad43e..23cf8aa1e9 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -165,6 +165,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend EventName = outgoingEvent.EventName, EventData = outgoingEvent.EventData }); + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName))); } } @@ -201,6 +202,12 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return Serializer.Serialize(eventData); } + protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) + { + EventTypes.GetOrAdd(eventName, eventType); + return base.OnAddToOutboxAsync(eventName, eventType, eventData); + } + private List GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd( @@ -208,7 +215,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend type => { var eventName = EventNameAttribute.GetNameOrDefault(type); - EventTypes[eventName] = type; + EventTypes.GetOrAdd(eventName, eventType); return new List(); } ); diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 057b081ad8..62e161fad1 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -282,6 +282,12 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen }); } + protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) + { + EventTypes.GetOrAdd(eventName, eventType); + return base.OnAddToOutboxAsync(eventName, eventType, eventData); + } + private List GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd( @@ -289,7 +295,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen type => { var eventName = EventNameAttribute.GetNameOrDefault(type); - EventTypes[eventName] = type; + EventTypes.GetOrAdd(eventName, eventType); return new List(); } ); diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index 050f730dba..8d4756165a 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -358,6 +358,12 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe } } + protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) + { + EventTypes.GetOrAdd(eventName, eventType); + return base.OnAddToOutboxAsync(eventName, eventType, eventData); + } + private List GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd( @@ -365,7 +371,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe type => { var eventName = EventNameAttribute.GetNameOrDefault(type); - EventTypes[eventName] = type; + EventTypes.GetOrAdd(eventName, eventType); return new List(); } ); diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index befc734aa3..0983e98836 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -184,6 +184,12 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } + protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) + { + EventTypes.GetOrAdd(eventName, eventType); + return base.OnAddToOutboxAsync(eventName, eventType, eventData); + } + private List GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd( @@ -191,7 +197,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen type => { var eventName = EventNameAttribute.GetNameOrDefault(type); - EventTypes[eventName] = type; + EventTypes.GetOrAdd(eventName, eventType); return new List(); } ); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index 871fb9459f..89423ce111 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -105,7 +105,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB IncomingEventInfo incomingEvent, InboxConfig inboxConfig); - private async Task AddToOutboxAsync(Type eventType, object eventData) + protected virtual async Task AddToOutboxAsync(Type eventType, object eventData) { var unitOfWork = UnitOfWorkManager.Current; if (unitOfWork == null) @@ -117,10 +117,11 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB { if (outboxConfig.Selector == null || outboxConfig.Selector(eventType)) { - var eventOutbox = - (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); + var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); var eventName = EventNameAttribute.GetNameOrDefault(eventType); + await OnAddToOutboxAsync(eventName, eventType, eventData); + await TriggerDistributedEventSentAsync(new DistributedEventSent() { Source = DistributedEventSource.Direct, @@ -143,6 +144,11 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB return false; } + protected virtual Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) + { + return Task.CompletedTask; + } + protected async Task AddToInboxAsync( string messageId, string eventName,