Browse Source

Changes to make LocalDistributedEventBus able to use outbox/inbox patterns.

pull/22134/head
Nuno Vieira 1 year ago
committed by maliming
parent
commit
2d84e52fcd
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 197
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs

197
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs

@ -1,35 +1,87 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reflection;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.Collections;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Local;
using System.Linq;
using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Timing;
using Volo.Abp.Tracing;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Distributed;
[Dependency(TryRegister = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(LocalDistributedEventBus))]
public class LocalDistributedEventBus : IDistributedEventBus, ISingletonDependency
public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
private readonly ILocalEventBus _localEventBus;
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
public LocalDistributedEventBus(IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, Volo.Abp.Uow.IUnitOfWorkManager unitOfWorkManager, IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, ILocalEventBus localEventBus, ICorrelationIdProvider correlationIdProvider)
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider)
{
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
Subscribe(abpDistributedEventBusOptions.Value.Handlers);
}
public LocalDistributedEventBus(
ILocalEventBus localEventBus,
IServiceScopeFactory serviceScopeFactory,
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions)
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
_localEventBus = localEventBus;
ServiceScopeFactory = serviceScopeFactory;
AbpDistributedEventBusOptions = distributedEventBusOptions.Value;
Subscribe(distributedEventBusOptions.Value.Handlers);
EventTypes.GetOrAdd(eventName, eventType);
return base.OnAddToOutboxAsync(eventName, eventType, eventData);
}
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
if (eventType == null)
{
return;
}
var eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType);
if (eventData == null)
{
return;
}
await LocalEventBus.PublishAsync(eventType, eventData);
}
public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
if (eventType == null)
return;
var eventData = JsonSerializer.Deserialize(outgoingEvent.EventData, eventType);
if (eventData == null)
{
return;
}
await LocalEventBus.PublishAsync(eventType, eventData);
}
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
foreach (var outgoingEvent in outgoingEvents)
{
await PublishFromOutboxAsync(outgoingEvent, outboxConfig);
}
}
public virtual void Subscribe(ITypeList<IEventHandler> handlers)
{
foreach (var handler in handlers)
@ -51,122 +103,77 @@ public class LocalDistributedEventBus : IDistributedEventBus, ISingletonDependen
}
}
/// <inheritdoc/>
public virtual IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
public IDisposable Subscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
return _localEventBus.Subscribe(action);
return LocalEventBus.Subscribe(eventType, factory);
}
public IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
return _localEventBus.Subscribe(handler);
LocalEventBus.Unsubscribe(action);
}
public IDisposable Subscribe<TEvent, THandler>() where TEvent : class where THandler : IEventHandler, new()
public override void Unsubscribe(Type eventType, IEventHandler handler)
{
return _localEventBus.Subscribe<TEvent, THandler>();
LocalEventBus.Unsubscribe(eventType, handler);
}
public IDisposable Subscribe(Type eventType, IEventHandler handler)
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
return _localEventBus.Subscribe(eventType, handler);
LocalEventBus.Unsubscribe(eventType, factory);
}
public IDisposable Subscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
public override void UnsubscribeAll(Type eventType)
{
return _localEventBus.Subscribe<TEvent>(factory);
LocalEventBus.UnsubscribeAll(eventType);
}
public IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
return _localEventBus.Subscribe(eventType, factory);
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
public void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
_localEventBus.Unsubscribe(action);
}
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
public void Unsubscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class
{
_localEventBus.Unsubscribe(handler);
}
public void Unsubscribe(Type eventType, IEventHandler handler)
{
_localEventBus.Unsubscribe(eventType, handler);
}
public void Unsubscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
{
_localEventBus.Unsubscribe<TEvent>(factory);
}
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))
)
{
handlerFactoryList.Add(
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
public void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
_localEventBus.Unsubscribe(eventType, factory);
return handlerFactoryList.ToArray();
}
public void UnsubscribeAll<TEvent>() where TEvent : class
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
_localEventBus.UnsubscribeAll<TEvent>();
}
//Should trigger same type
if (handlerEventType == targetEventType)
{
return true;
}
public void UnsubscribeAll(Type eventType)
{
_localEventBus.UnsubscribeAll(eventType);
}
//Should trigger for inherited types
if (handlerEventType.IsAssignableFrom(targetEventType))
{
return true;
}
public async Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class
{
await PublishDistributedEventSentReceivedAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
return false;
}
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
await PublishDistributedEventSentReceivedAsync(eventType, eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
}
public async Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishDistributedEventSentReceivedAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
}
await LocalEventBus.PublishAsync(eventType, eventData);
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
await PublishDistributedEventSentReceivedAsync(eventType, eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
}
private async Task PublishDistributedEventSentReceivedAsync(Type eventType, object eventData, bool onUnitOfWorkComplete)
protected override byte[] Serialize(object eventData)
{
if (eventType != typeof(DistributedEventSent))
{
await _localEventBus.PublishAsync(new DistributedEventSent
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
}, onUnitOfWorkComplete);
}
if (eventType != typeof(DistributedEventReceived))
{
await _localEventBus.PublishAsync(new DistributedEventReceived
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
}, onUnitOfWorkComplete);
}
return JsonSerializer.SerializeToUtf8Bytes(eventData);
}
}
}

Loading…
Cancel
Save