Browse Source

Refactor `LocalDistributedEventBus`.

pull/22134/head
maliming 1 year ago
parent
commit
fbe3fe10bf
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 17
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventTypeWithEventHandlerFactories.cs
  2. 10
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs
  3. 6
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs
  4. 181
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs
  5. 13
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs
  6. 5
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs
  7. 6
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs

17
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventTypeWithEventHandlerFactories.cs

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
namespace Volo.Abp.EventBus;
public class EventTypeWithEventHandlerFactories
{
public Type EventType { get; }
public List<IEventHandlerFactory> EventHandlerFactories { get; }
public EventTypeWithEventHandlerFactories(Type eventType, List<IEventHandlerFactory> eventHandlerFactories)
{
EventType = eventType;
EventHandlerFactories = eventHandlerFactories;
}
}

10
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
namespace Volo.Abp.EventBus.Local;
@ -8,11 +9,18 @@ namespace Volo.Abp.EventBus.Local;
public interface ILocalEventBus : IEventBus
{
/// <summary>
/// Registers to an event.
/// Registers to an event.
/// Same (given) instance of the handler is used for all event occurrences.
/// </summary>
/// <typeparam name="TEvent">Event type</typeparam>
/// <param name="handler">Object to handle the event</param>
IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler)
where TEvent : class;
/// <summary>
/// Gets the list of event handler factories for the given event type.
/// </summary>
/// <param name="eventType">Event type</param>
/// <returns></returns>
List<EventTypeWithEventHandlerFactories> GetEventHandlerFactories(Type eventType);
}

6
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs

@ -62,7 +62,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox);
}
public async Task PublishAsync(
public virtual async Task PublishAsync(
Type eventType,
object eventData,
bool onUnitOfWorkComplete = true,
@ -227,7 +227,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
{
try
{
await LocalEventBus.PublishAsync(distributedEvent);
await LocalEventBus.PublishAsync(distributedEvent, onUnitOfWorkComplete: false);
}
catch (Exception)
{
@ -239,7 +239,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
{
try
{
await LocalEventBus.PublishAsync(distributedEvent);
await LocalEventBus.PublishAsync(distributedEvent, false);
}
catch (Exception)
{

181
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs

@ -1,7 +1,9 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
@ -9,7 +11,6 @@ using Microsoft.Extensions.Options;
using Volo.Abp.Collections;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Local;
using System.Linq;
using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Timing;
@ -22,66 +23,32 @@ namespace Volo.Abp.EventBus.Distributed;
[ExposeServices(typeof(IDistributedEventBus), typeof(LocalDistributedEventBus))]
public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
public LocalDistributedEventBus(IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, Volo.Abp.Uow.IUnitOfWorkManager unitOfWorkManager, IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, ILocalEventBus localEventBus, ICorrelationIdProvider correlationIdProvider)
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider)
public LocalDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IGuidGenerator guidGenerator,
IClock clock,
IEventHandlerInvoker eventHandlerInvoker,
ILocalEventBus localEventBus,
ICorrelationIdProvider correlationIdProvider)
: base(serviceScopeFactory,
currentTenant,
unitOfWorkManager,
abpDistributedEventBusOptions,
guidGenerator,
clock,
eventHandlerInvoker,
localEventBus,
correlationIdProvider)
{
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
Subscribe(abpDistributedEventBusOptions.Value.Handlers);
}
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
EventTypes.GetOrAdd(eventName, eventType);
return base.OnAddToOutboxAsync(eventName, eventType, eventData);
}
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
if (eventType == null)
{
return;
}
var eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType);
if (eventData == null)
{
return;
}
await LocalEventBus.PublishAsync(eventType, eventData);
}
public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
if (eventType == null)
return;
var eventData = JsonSerializer.Deserialize(outgoingEvent.EventData, eventType);
if (eventData == null)
{
return;
}
await LocalEventBus.PublishAsync(eventType, eventData);
}
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
foreach (var outgoingEvent in outgoingEvents)
{
await PublishFromOutboxAsync(outgoingEvent, outboxConfig);
}
}
public virtual void Subscribe(ITypeList<IEventHandler> handlers)
{
foreach (var handler in handlers)
@ -103,7 +70,6 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
}
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
return LocalEventBus.Subscribe(eventType, factory);
@ -129,51 +95,114 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen
LocalEventBus.UnsubscribeAll(eventType);
}
public async override Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext(), useOutbox)
);
return;
}
if (useOutbox)
{
if (await AddToOutboxAsync(eventType, eventData))
{
return;
}
}
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
});
await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
});
await PublishToEventBusAsync(eventType, eventData);
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
await LocalEventBus.PublishAsync(eventType, eventData, false);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))
)
await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
{
handlerFactoryList.Add(
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
Source = DistributedEventSource.Direct,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
return handlerFactoryList.ToArray();
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName)!;
var eventData = JsonSerializer.Deserialize(Encoding.UTF8.GetString(outgoingEvent.EventData), eventType)!;
await LocalEventBus.PublishAsync(eventType, eventData, false);
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
//Should trigger same type
if (handlerEventType == targetEventType)
foreach (var outgoingEvent in outgoingEvents)
{
return true;
await PublishFromOutboxAsync(outgoingEvent, outboxConfig);
}
}
//Should trigger for inherited types
if (handlerEventType.IsAssignableFrom(targetEventType))
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
if (eventType == null)
{
return true;
return;
}
return false;
var eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId()))
{
await TriggerHandlersFromInboxAsync(eventType, eventData!, exceptions, inboxConfig);
}
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
}
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
protected override byte[] Serialize(object eventData)
{
await LocalEventBus.PublishAsync(eventType, eventData);
return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(eventData));
}
protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData)
{
EventTypes.GetOrAdd(eventName, eventType);
return base.OnAddToOutboxAsync(eventName, eventType, eventData);
}
protected override byte[] Serialize(object eventData)
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
return JsonSerializer.SerializeToUtf8Bytes(eventData);
return LocalEventBus.GetEventHandlerFactories(eventType);
}
}

13
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs

@ -242,19 +242,6 @@ public abstract class EventBusBase : IEventBus
};
}
protected class EventTypeWithEventHandlerFactories
{
public Type EventType { get; }
public List<IEventHandlerFactory> EventHandlerFactories { get; }
public EventTypeWithEventHandlerFactories(Type eventType, List<IEventHandlerFactory> eventHandlerFactories)
{
EventType = eventType;
EventHandlerFactories = eventHandlerFactories;
}
}
// Reference from
// https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/
protected struct SynchronizationContextRemover : INotifyCompletion

5
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs

@ -136,6 +136,11 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData);
}
public virtual List<EventTypeWithEventHandlerFactories> GetEventHandlerFactories(Type eventType)
{
return GetHandlerFactories(eventType).ToList();
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<Tuple<IEventHandlerFactory, Type, int>>();

6
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Volo.Abp.EventBus.Local;
@ -22,6 +23,11 @@ public sealed class NullLocalEventBus : ILocalEventBus
return NullDisposable.Instance;
}
public List<EventTypeWithEventHandlerFactories> GetEventHandlerFactories(Type eventType)
{
return new List<EventTypeWithEventHandlerFactories>();
}
public IDisposable Subscribe<TEvent, THandler>() where TEvent : class where THandler : IEventHandler, new()
{
return NullDisposable.Instance;

Loading…
Cancel
Save