Browse Source

Inherit `DistributedEventBusBase`.

pull/8375/head
maliming 5 years ago
parent
commit
0b62e07236
  1. 6
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs
  2. 11
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs
  3. 67
      framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs

6
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs

@ -6,6 +6,8 @@ namespace Volo.Abp.AzureServiceBus
{
byte[] Serialize(object obj);
object Deserialize(BinaryData value, Type type);
object Deserialize(byte[] value, Type type);
T Deserialize<T>(byte[] value);
}
}
}

11
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs

@ -19,9 +19,14 @@ namespace Volo.Abp.AzureServiceBus
return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj));
}
public object Deserialize(BinaryData value, Type type)
public object Deserialize(byte[] value, Type type)
{
return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value.ToArray()));
return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value));
}
public T Deserialize<T>(byte[] value)
{
return _jsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(value));
}
}
}
}

67
framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs

@ -9,36 +9,47 @@ using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.AzureServiceBus;
using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Azure
{
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(AzureDistributedEventBus))]
public class AzureDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
public class AzureDistributedEventBus : DistributedEventBusBase, IDistributedEventBus, ISingletonDependency
{
private readonly AbpAzureEventBusOptions _options;
private readonly AbpDistributedEventBusOptions _distributedEventBusOptions;
private readonly IAzureServiceBusMessageConsumerFactory _messageConsumerFactory;
private readonly IPublisherPool _publisherPool;
private readonly IAzureServiceBusSerializer _serializer;
private readonly ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories;
private readonly ConcurrentDictionary<string, Type> _eventTypes;
private IAzureServiceBusMessageConsumer _consumer;
public AzureDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IEventErrorHandler errorHandler,
IOptions<AbpAzureEventBusOptions> abpAzureEventBusOptions,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IGuidGenerator guidGenerator,
IClock clock,
IOptions<AbpAzureEventBusOptions> abpAzureEventBusOptions,
IAzureServiceBusSerializer serializer,
IAzureServiceBusMessageConsumerFactory messageConsumerFactory,
IPublisherPool publisherPool)
: base(serviceScopeFactory, currentTenant, errorHandler)
: base(serviceScopeFactory,
currentTenant,
unitOfWorkManager,
errorHandler,
abpDistributedEventBusOptions,
guidGenerator,
clock)
{
_options = abpAzureEventBusOptions.Value;
_distributedEventBusOptions = abpDistributedEventBusOptions.Value;
_serializer = serializer;
_messageConsumerFactory = messageConsumerFactory;
_publisherPool = publisherPool;
@ -54,7 +65,7 @@ namespace Volo.Abp.EventBus.Azure
_options.ConnectionName);
_consumer.OnMessageReceived(ProcessEventAsync);
SubscribeHandlers(_distributedEventBusOptions.Handlers);
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}
private async Task ProcessEventAsync(ServiceBusReceivedMessage message)
@ -66,14 +77,36 @@ namespace Volo.Abp.EventBus.Azure
return;
}
var eventData = _serializer.Deserialize(message.Body, eventType);
var eventData = _serializer.Deserialize(message.Body.ToArray(), eventType);
await TriggerHandlersAsync(eventType, eventData);
}
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
public override async Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData);
}
public override async Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
var eventType = _eventTypes.GetOrDefault(incomingEvent.EventName);
if (eventType == null)
{
return;
}
var eventData = _serializer.Deserialize(incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig);
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
}
}
protected override byte[] Serialize(object eventData)
{
return Subscribe(typeof(TEvent), handler);
return _serializer.Serialize(eventData);
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
@ -142,16 +175,25 @@ namespace Volo.Abp.EventBus.Azure
.Locking(factories => factories.Clear());
}
public override async Task PublishAsync(Type eventType, object eventData)
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishAsync(eventType, eventData);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
protected virtual async Task PublishAsync(string eventName, object eventData)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = _serializer.Serialize(eventData);
var message = new ServiceBusMessage(body)
{
Subject = eventName
};
var publisher = await _publisherPool.GetAsync(
_options.TopicName,
_options.ConnectionName);
@ -185,5 +227,6 @@ namespace Volo.Abp.EventBus.Azure
}
);
}
}
}

Loading…
Cancel
Save