From 0b62e07236f167b51c588294dc1b91e13ffa412b Mon Sep 17 00:00:00 2001 From: maliming Date: Mon, 4 Oct 2021 11:01:47 +0800 Subject: [PATCH] Inherit `DistributedEventBusBase`. --- .../IAzureServiceBusSerializer.cs | 6 +- .../Utf8JsonAzureServiceBusSerializer.cs | 11 ++- .../Azure/AzureDistributedEventBus.cs | 67 +++++++++++++++---- 3 files changed, 67 insertions(+), 17 deletions(-) diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs index 04f56d4470..61fc9e5ca0 100644 --- a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs @@ -6,6 +6,8 @@ namespace Volo.Abp.AzureServiceBus { byte[] Serialize(object obj); - object Deserialize(BinaryData value, Type type); + object Deserialize(byte[] value, Type type); + + T Deserialize(byte[] value); } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs index 0a87e22934..d8d6fb82a2 100644 --- a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs @@ -19,9 +19,14 @@ namespace Volo.Abp.AzureServiceBus return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj)); } - public object Deserialize(BinaryData value, Type type) + public object Deserialize(byte[] value, Type type) { - return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value.ToArray())); + return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value)); + } + + public T Deserialize(byte[] value) + { + return _jsonSerializer.Deserialize(Encoding.UTF8.GetString(value)); } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index 456284d1e8..f4fbca13b4 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -9,36 +9,47 @@ using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.AzureServiceBus; +using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; +using Volo.Abp.Timing; +using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Azure { [Dependency(ReplaceServices = true)] [ExposeServices(typeof(IDistributedEventBus), typeof(AzureDistributedEventBus))] - public class AzureDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency + public class AzureDistributedEventBus : DistributedEventBusBase, IDistributedEventBus, ISingletonDependency { private readonly AbpAzureEventBusOptions _options; - private readonly AbpDistributedEventBusOptions _distributedEventBusOptions; private readonly IAzureServiceBusMessageConsumerFactory _messageConsumerFactory; private readonly IPublisherPool _publisherPool; private readonly IAzureServiceBusSerializer _serializer; private readonly ConcurrentDictionary> _handlerFactories; private readonly ConcurrentDictionary _eventTypes; private IAzureServiceBusMessageConsumer _consumer; + public AzureDistributedEventBus( IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, + IUnitOfWorkManager unitOfWorkManager, IEventErrorHandler errorHandler, - IOptions abpAzureEventBusOptions, IOptions abpDistributedEventBusOptions, + IGuidGenerator guidGenerator, + IClock clock, + IOptions abpAzureEventBusOptions, IAzureServiceBusSerializer serializer, IAzureServiceBusMessageConsumerFactory messageConsumerFactory, IPublisherPool publisherPool) - : base(serviceScopeFactory, currentTenant, errorHandler) + : base(serviceScopeFactory, + currentTenant, + unitOfWorkManager, + errorHandler, + abpDistributedEventBusOptions, + guidGenerator, + clock) { _options = abpAzureEventBusOptions.Value; - _distributedEventBusOptions = abpDistributedEventBusOptions.Value; _serializer = serializer; _messageConsumerFactory = messageConsumerFactory; _publisherPool = publisherPool; @@ -54,7 +65,7 @@ namespace Volo.Abp.EventBus.Azure _options.ConnectionName); _consumer.OnMessageReceived(ProcessEventAsync); - SubscribeHandlers(_distributedEventBusOptions.Handlers); + SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); } private async Task ProcessEventAsync(ServiceBusReceivedMessage message) @@ -66,14 +77,36 @@ namespace Volo.Abp.EventBus.Azure return; } - var eventData = _serializer.Deserialize(message.Body, eventType); + var eventData = _serializer.Deserialize(message.Body.ToArray(), eventType); await TriggerHandlersAsync(eventType, eventData); } - public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class + public override async Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) + { + await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData); + } + + public override async Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) + { + var eventType = _eventTypes.GetOrDefault(incomingEvent.EventName); + if (eventType == null) + { + return; + } + + var eventData = _serializer.Deserialize(incomingEvent.EventData, eventType); + var exceptions = new List(); + await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig); + if (exceptions.Any()) + { + ThrowOriginalExceptions(eventType, exceptions); + } + } + + protected override byte[] Serialize(object eventData) { - return Subscribe(typeof(TEvent), handler); + return _serializer.Serialize(eventData); } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) @@ -142,16 +175,25 @@ namespace Volo.Abp.EventBus.Azure .Locking(factories => factories.Clear()); } - public override async Task PublishAsync(Type eventType, object eventData) + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) + { + await PublishAsync(eventType, eventData); + } + + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) + { + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); + } + + protected virtual async Task PublishAsync(string eventName, object eventData) { - var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = _serializer.Serialize(eventData); var message = new ServiceBusMessage(body) { Subject = eventName }; - + var publisher = await _publisherPool.GetAsync( _options.TopicName, _options.ConnectionName); @@ -185,5 +227,6 @@ namespace Volo.Abp.EventBus.Azure } ); } + } }