using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Volo.Abp.EventBus.Local; using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Timing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Distributed; public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus, ISupportsEventBoxes { protected IGuidGenerator GuidGenerator { get; } protected IClock Clock { get; } protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } protected ILocalEventBus LocalEventBus { get; } protected DistributedEventBusBase( IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, IUnitOfWorkManager unitOfWorkManager, IOptions abpDistributedEventBusOptions, IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, ILocalEventBus localEventBus) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, eventHandlerInvoker) { GuidGenerator = guidGenerator; Clock = clock; AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; LocalEventBus = localEventBus; } public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class { return Subscribe(typeof(TEvent), handler); } public override Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) { return PublishAsync(eventType, eventData, onUnitOfWorkComplete, useOutbox: true); } public Task PublishAsync( TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class { return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox); } public async Task PublishAsync( Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) { if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null) { AddToUnitOfWork( UnitOfWorkManager.Current, new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext(), useOutbox) ); return; } if (useOutbox) { if (await AddToOutboxAsync(eventType, eventData)) { return; } } await TriggerDistributedEventSentAsync(new DistributedEventSent() { Source = DistributedEventSource.Direct, EventName = EventNameAttribute.GetNameOrDefault(eventType), EventData = eventData }); await PublishToEventBusAsync(eventType, eventData); } public abstract Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig ); public abstract Task PublishManyFromOutboxAsync( IEnumerable outgoingEvents, OutboxConfig outboxConfig ); public abstract Task ProcessFromInboxAsync( IncomingEventInfo incomingEvent, InboxConfig inboxConfig); protected virtual async Task AddToOutboxAsync(Type eventType, object eventData) { var unitOfWork = UnitOfWorkManager.Current; if (unitOfWork == null) { return false; } foreach (var outboxConfig in AbpDistributedEventBusOptions.Outboxes.Values.OrderBy(x => x.Selector is null)) { if (outboxConfig.Selector == null || outboxConfig.Selector(eventType)) { var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); var eventName = EventNameAttribute.GetNameOrDefault(eventType); await OnAddToOutboxAsync(eventName, eventType, eventData); await TriggerDistributedEventSentAsync(new DistributedEventSent() { Source = DistributedEventSource.Direct, EventName = eventName, EventData = eventData }); await eventOutbox.EnqueueAsync( new OutgoingEventInfo( GuidGenerator.Create(), eventName, Serialize(eventData), Clock.Now ) ); return true; } } return false; } protected virtual Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) { return Task.CompletedTask; } protected async Task AddToInboxAsync( string messageId, string eventName, Type eventType, object eventData) { if (AbpDistributedEventBusOptions.Inboxes.Count <= 0) { return false; } using (var scope = ServiceScopeFactory.CreateScope()) { foreach (var inboxConfig in AbpDistributedEventBusOptions.Inboxes.Values.OrderBy(x => x.EventSelector is null)) { if (inboxConfig.EventSelector == null || inboxConfig.EventSelector(eventType)) { var eventInbox = (IEventInbox)scope.ServiceProvider.GetRequiredService(inboxConfig.ImplementationType); if (!messageId.IsNullOrEmpty()) { if (await eventInbox.ExistsByMessageIdAsync(messageId)) { continue; } } await TriggerDistributedEventReceivedAsync(new DistributedEventReceived { Source = DistributedEventSource.Direct, EventName = EventNameAttribute.GetNameOrDefault(eventType), EventData = eventData }); await eventInbox.EnqueueAsync( new IncomingEventInfo( GuidGenerator.Create(), messageId, eventName, Serialize(eventData), Clock.Now ) ); } } } return true; } protected abstract byte[] Serialize(object eventData); protected virtual async Task TriggerHandlersDirectAsync(Type eventType, object eventData) { await TriggerDistributedEventReceivedAsync(new DistributedEventReceived { Source = DistributedEventSource.Direct, EventName = EventNameAttribute.GetNameOrDefault(eventType), EventData = eventData }); await TriggerHandlersAsync(eventType, eventData); } protected virtual async Task TriggerHandlersFromInboxAsync(Type eventType, object eventData, List exceptions, InboxConfig inboxConfig = null) { await TriggerDistributedEventReceivedAsync(new DistributedEventReceived { Source = DistributedEventSource.Inbox, EventName = EventNameAttribute.GetNameOrDefault(eventType), EventData = eventData }); await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig); } public virtual async Task TriggerDistributedEventSentAsync(DistributedEventSent distributedEvent) { try { await LocalEventBus.PublishAsync(distributedEvent); } catch (Exception _) { // ignored } } public virtual async Task TriggerDistributedEventReceivedAsync(DistributedEventReceived distributedEvent) { try { await LocalEventBus.PublishAsync(distributedEvent); } catch (Exception _) { // ignored } } }