Browse Source

Add `OnAddToOutboxAsync` method.

pull/16333/head
maliming 3 years ago
parent
commit
d9e6e2c45c
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 72
      framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs
  2. 9
      framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs
  3. 8
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
  4. 8
      framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs
  5. 8
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs
  6. 12
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs

72
framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs

@ -22,13 +22,13 @@ namespace Volo.Abp.EventBus.Azure;
[ExposeServices(typeof(IDistributedEventBus), typeof(AzureDistributedEventBus))]
public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
private readonly AbpAzureEventBusOptions _options;
private readonly IAzureServiceBusMessageConsumerFactory _messageConsumerFactory;
private readonly IPublisherPool _publisherPool;
private readonly IAzureServiceBusSerializer _serializer;
private readonly ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories;
private readonly ConcurrentDictionary<string, Type> _eventTypes;
private IAzureServiceBusMessageConsumer _consumer;
protected AbpAzureEventBusOptions Options { get; }
protected IAzureServiceBusMessageConsumerFactory MessageConsumerFactory { get; }
protected IPublisherPool PublisherPool { get; }
protected IAzureServiceBusSerializer Serializer { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected IAzureServiceBusMessageConsumer Consumer { get; private set; }
public AzureDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
@ -52,22 +52,22 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
eventHandlerInvoker,
localEventBus)
{
_options = abpAzureEventBusOptions.Value;
_serializer = serializer;
_messageConsumerFactory = messageConsumerFactory;
_publisherPool = publisherPool;
_handlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
_eventTypes = new ConcurrentDictionary<string, Type>();
Options = abpAzureEventBusOptions.Value;
Serializer = serializer;
MessageConsumerFactory = messageConsumerFactory;
PublisherPool = publisherPool;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
}
public void Initialize()
{
_consumer = _messageConsumerFactory.CreateMessageConsumer(
_options.TopicName,
_options.SubscriberName,
_options.ConnectionName);
Consumer = MessageConsumerFactory.CreateMessageConsumer(
Options.TopicName,
Options.SubscriberName,
Options.ConnectionName);
_consumer.OnMessageReceived(ProcessEventAsync);
Consumer.OnMessageReceived(ProcessEventAsync);
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}
@ -78,13 +78,13 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
return;
}
var eventType = _eventTypes.GetOrDefault(eventName);
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return;
}
var eventData = _serializer.Deserialize(message.Body.ToArray(), eventType);
var eventData = Serializer.Deserialize(message.Body.ToArray(), eventType);
if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData))
{
@ -110,9 +110,9 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
var outgoingEventArray = outgoingEvents.ToArray();
var publisher = await _publisherPool.GetAsync(
_options.TopicName,
_options.ConnectionName);
var publisher = await PublisherPool.GetAsync(
Options.TopicName,
Options.ConnectionName);
using var messageBatch = await publisher.CreateMessageBatchAsync();
@ -144,13 +144,13 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
var eventType = _eventTypes.GetOrDefault(incomingEvent.EventName);
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
if (eventType == null)
{
return;
}
var eventData = _serializer.Deserialize(incomingEvent.EventData, eventType);
var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
if (exceptions.Any())
@ -161,7 +161,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected override byte[] Serialize(object eventData)
{
return _serializer.Serialize(eventData);
return Serializer.Serialize(eventData);
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
@ -242,7 +242,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected virtual Task PublishAsync(string eventName, object eventData)
{
var body = _serializer.Serialize(eventData);
var body = Serializer.Serialize(eventData);
return PublishAsync(eventName, body, null);
}
@ -262,16 +262,16 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
message.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
}
var publisher = await _publisherPool.GetAsync(
_options.TopicName,
_options.ConnectionName);
var publisher = await PublisherPool.GetAsync(
Options.TopicName,
Options.ConnectionName);
await publisher.SendMessageAsync(message);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
return _handlerFactories
return HandlerFactories
.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))
.Select(handlerFactory =>
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value))
@ -283,14 +283,20 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType);
}
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
EventTypes.GetOrAdd(eventName, eventType);
return base.OnAddToOutboxAsync(eventName, eventType, eventData);
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return _handlerFactories.GetOrAdd(
return HandlerFactories.GetOrAdd(
eventType,
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
_eventTypes[eventName] = type;
EventTypes.GetOrAdd(eventName, eventType);
return new List<IEventHandlerFactory>();
}
);

9
framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs

@ -165,6 +165,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)));
}
}
@ -201,6 +202,12 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
return Serializer.Serialize(eventData);
}
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
EventTypes.GetOrAdd(eventName, eventType);
return base.OnAddToOutboxAsync(eventName, eventType, eventData);
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(
@ -208,7 +215,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
EventTypes[eventName] = type;
EventTypes.GetOrAdd(eventName, eventType);
return new List<IEventHandlerFactory>();
}
);

8
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs

@ -282,6 +282,12 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
});
}
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
EventTypes.GetOrAdd(eventName, eventType);
return base.OnAddToOutboxAsync(eventName, eventType, eventData);
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(
@ -289,7 +295,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
EventTypes[eventName] = type;
EventTypes.GetOrAdd(eventName, eventType);
return new List<IEventHandlerFactory>();
}
);

8
framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs

@ -358,6 +358,12 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
}
}
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
EventTypes.GetOrAdd(eventName, eventType);
return base.OnAddToOutboxAsync(eventName, eventType, eventData);
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(
@ -365,7 +371,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
EventTypes[eventName] = type;
EventTypes.GetOrAdd(eventName, eventType);
return new List<IEventHandlerFactory>();
}
);

8
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

@ -184,6 +184,12 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
EventTypes.GetOrAdd(eventName, eventType);
return base.OnAddToOutboxAsync(eventName, eventType, eventData);
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(
@ -191,7 +197,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
EventTypes[eventName] = type;
EventTypes.GetOrAdd(eventName, eventType);
return new List<IEventHandlerFactory>();
}
);

12
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs

@ -105,7 +105,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
IncomingEventInfo incomingEvent,
InboxConfig inboxConfig);
private async Task<bool> AddToOutboxAsync(Type eventType, object eventData)
protected virtual async Task<bool> AddToOutboxAsync(Type eventType, object eventData)
{
var unitOfWork = UnitOfWorkManager.Current;
if (unitOfWork == null)
@ -117,10 +117,11 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
{
if (outboxConfig.Selector == null || outboxConfig.Selector(eventType))
{
var eventOutbox =
(IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
await OnAddToOutboxAsync(eventName, eventType, eventData);
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Direct,
@ -143,6 +144,11 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
return false;
}
protected virtual Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
return Task.CompletedTask;
}
protected async Task<bool> AddToInboxAsync(
string messageId,
string eventName,

Loading…
Cancel
Save