diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index 13de378855..32b07f1b82 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -102,6 +102,8 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { + await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.GetCorrelationId(), outgoingEvent.Id); + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -111,8 +113,6 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen EventData = outgoingEvent.EventData }); } - - await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.GetCorrelationId(), outgoingEvent.Id); } public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) @@ -141,7 +141,12 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen throw new AbpException( "The message is too large to fit in the batch. Set AbpEventBusBoxesOptions.OutboxWaitingEventMaxCount to reduce the number"); } + } + await publisher.SendMessagesAsync(messageBatch); + + foreach (var outgoingEvent in outgoingEventArray) + { using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -152,8 +157,6 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen }); } } - - await publisher.SendMessagesAsync(messageBatch); } public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index ce6dee2cfc..2b0d2c7d0d 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -153,6 +153,8 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId()); + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -162,8 +164,6 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend EventData = outgoingEvent.EventData }); } - - await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId()); } public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) @@ -172,6 +172,8 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend foreach (var outgoingEvent in outgoingEventArray) { + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId()); + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -181,8 +183,6 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend EventData = outgoingEvent.EventData }); } - - await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId()); } } diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index a777ff7da2..4bdf6a2782 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -197,16 +197,6 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) - { - await TriggerDistributedEventSentAsync(new DistributedEventSent() - { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); - } - var headers = new Headers { { "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) } @@ -222,6 +212,16 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen outgoingEvent.EventData, headers ); + + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) + { + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } } public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) @@ -242,6 +242,15 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); } + producer.Produce( + AbpKafkaEventBusOptions.TopicName, + new Message + { + Key = outgoingEvent.EventName, + Value = outgoingEvent.EventData, + Headers = headers + }); + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -251,15 +260,6 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen EventData = outgoingEvent.EventData }); } - - producer.Produce( - AbpKafkaEventBusOptions.TopicName, - new Message - { - Key = outgoingEvent.EventName, - Value = outgoingEvent.EventData, - Headers = headers - }); } } diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index 1c8012f529..bc1c63f3ac 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -207,6 +207,8 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { + await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId()); + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -216,8 +218,6 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe EventData = outgoingEvent.EventData }); } - - await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId()); } public async override Task PublishManyFromOutboxAsync( @@ -231,6 +231,13 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe foreach (var outgoingEvent in outgoingEventArray) { + await PublishAsync( + channel, + outgoingEvent.EventName, + outgoingEvent.EventData, + eventId: outgoingEvent.Id, + correlationId: outgoingEvent.GetCorrelationId()); + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -240,13 +247,6 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe EventData = outgoingEvent.EventData }); } - - await PublishAsync( - channel, - outgoingEvent.EventName, - outgoingEvent.EventData, - eventId: outgoingEvent.Id, - correlationId: outgoingEvent.GetCorrelationId()); } channel.WaitForConfirmsOrDie(); diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 4b4dd3eef7..9e8398a495 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -253,6 +253,14 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName)!; var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); + var headers = new Dictionary(); + if (outgoingEvent.GetCorrelationId() != null) + { + headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId()!); + } + + await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id, headersArguments: headers); + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() { @@ -261,14 +269,6 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen EventData = outgoingEvent.EventData }); } - - var headers = new Dictionary(); - if (outgoingEvent.GetCorrelationId() != null) - { - headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId()!); - } - - await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id, headersArguments: headers); } public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) @@ -279,6 +279,8 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen { foreach (var outgoingEvent in outgoingEventArray) { + await PublishFromOutboxAsync(outgoingEvent, outboxConfig); + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -288,8 +290,6 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen EventData = outgoingEvent.EventData }); } - - await PublishFromOutboxAsync(outgoingEvent, outboxConfig); } await scope.CompleteAsync(); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index 96a11f928e..ad69ed124b 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -85,14 +85,14 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB } } + await PublishToEventBusAsync(eventType, eventData); + await TriggerDistributedEventSentAsync(new DistributedEventSent() { Source = DistributedEventSource.Direct, EventName = EventNameAttribute.GetNameOrDefault(eventType), EventData = eventData }); - - await PublishToEventBusAsync(eventType, eventData); } public abstract Task PublishFromOutboxAsync( diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs index 2551fb54fa..7315063214 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs @@ -122,24 +122,50 @@ public class LocalDistributedEventBus : IDistributedEventBus, ISingletonDependen _localEventBus.UnsubscribeAll(eventType); } - public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) + public async Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class { - return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete); + await _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete); + await PublishDistributedEventSentReceivedAsync(typeof(TEvent), eventData, onUnitOfWorkComplete); } - public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) + public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) { - return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); + await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); + await PublishDistributedEventSentReceivedAsync(eventType, eventData, onUnitOfWorkComplete); } - public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class + public async Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class { - return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete); + await _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete); + await PublishDistributedEventSentReceivedAsync(typeof(TEvent), eventData, onUnitOfWorkComplete); } - public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) + public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) { - return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); + await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete); + await PublishDistributedEventSentReceivedAsync(eventType, eventData, onUnitOfWorkComplete); + } + + private async Task PublishDistributedEventSentReceivedAsync(Type eventType, object eventData, bool onUnitOfWorkComplete) + { + if (eventType == typeof(DistributedEventSent) || eventType == typeof(DistributedEventReceived)) + { + return; + } + + await _localEventBus.PublishAsync(new DistributedEventSent + { + Source = DistributedEventSource.Direct, + EventName = EventNameAttribute.GetNameOrDefault(eventType), + EventData = eventData + }, onUnitOfWorkComplete); + + await _localEventBus.PublishAsync(new DistributedEventReceived + { + Source = DistributedEventSource.Direct, + EventName = EventNameAttribute.GetNameOrDefault(eventType), + EventData = eventData + }, onUnitOfWorkComplete); } } diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/DistributedEventHandles.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/DistributedEventHandles.cs deleted file mode 100644 index eb1f1a52df..0000000000 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/DistributedEventHandles.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System.Threading.Tasks; - -namespace Volo.Abp.EventBus.Distributed; - -public class DistributedEventHandles : ILocalEventHandler, ILocalEventHandler -{ - public static int SentCount { get; set; } - - public static int ReceivedCount { get; set; } - - public Task HandleEventAsync(DistributedEventSent eventData) - { - SentCount++; - return Task.CompletedTask; - } - - public Task HandleEventAsync(DistributedEventReceived eventData) - { - ReceivedCount++; - return Task.CompletedTask; - } -} diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs index 880e2810cd..33362dc90b 100644 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs @@ -1,7 +1,6 @@ using System; using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; +using Shouldly; using Volo.Abp.Domain.Entities.Events.Distributed; using Volo.Abp.EventBus.Local; using Volo.Abp.Uow; @@ -11,12 +10,6 @@ namespace Volo.Abp.EventBus.Distributed; public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase { - protected override void AfterAddApplication(IServiceCollection services) - { - services.Replace(ServiceDescriptor.Singleton()); - base.AfterAddApplication(services); - } - [Fact] public async Task Should_Call_Handler_AndDispose() { @@ -76,60 +69,56 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase public async Task DistributedEventSentAndReceived_Test() { var localEventBus = GetRequiredService(); - if (localEventBus is UnitTestLocalEventBus eventBus) - { - eventBus.OnEventHandleInvoking = async (eventType, eventData) => - { - await localEventBus.PublishAsync(new DistributedEventReceived() - { - Source = DistributedEventSource.Direct, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData - }, onUnitOfWorkComplete: false); - }; - - eventBus.OnPublishing = async (eventType, eventData) => - { - await localEventBus.PublishAsync(new DistributedEventSent() - { - Source = DistributedEventSource.Direct, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData - }, onUnitOfWorkComplete: false); - }; - } - GetRequiredService().Subscribe(); - GetRequiredService().Subscribe(); + localEventBus.Subscribe(); + localEventBus.Subscribe(); DistributedEventBus.Subscribe(); using (var uow = GetRequiredService().Begin()) { + MyEventDate.Order = string.Empty; await DistributedEventBus.PublishAsync(new MyEventDate(), onUnitOfWorkComplete: false); - Assert.Equal(1, DistributedEventHandles.SentCount); - Assert.Equal(1, DistributedEventHandles.ReceivedCount); + MyEventDate.Order.ShouldBe(nameof(MyEventHandle) + nameof(DistributedEventSent) + nameof(DistributedEventReceived)); + MyEventDate.Order = string.Empty; await DistributedEventBus.PublishAsync(new MyEventDate(), onUnitOfWorkComplete: true); + MyEventDate.Order.ShouldBe(string.Empty); await uow.CompleteAsync(); - Assert.Equal(2, DistributedEventHandles.SentCount); - Assert.Equal(2, DistributedEventHandles.ReceivedCount); + MyEventDate.Order.ShouldBe(nameof(MyEventHandle) + nameof(DistributedEventSent) + nameof(DistributedEventReceived)); } } class MyEventDate { - + public static string Order { get; set; } = string.Empty; } class MyEventHandle : IDistributedEventHandler { public Task HandleEventAsync(MyEventDate eventData) { + MyEventDate.Order += nameof(MyEventHandle); return Task.CompletedTask; } } + + class DistributedEventHandles : ILocalEventHandler, ILocalEventHandler + { + public Task HandleEventAsync(DistributedEventSent eventData) + { + MyEventDate.Order += nameof(DistributedEventSent); + return Task.CompletedTask; + } + + public Task HandleEventAsync(DistributedEventReceived eventData) + { + MyEventDate.Order += nameof(DistributedEventReceived); + return Task.CompletedTask; + } + } + } diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/UnitTestLocalEventBus.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/UnitTestLocalEventBus.cs deleted file mode 100644 index 1511d6a071..0000000000 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/UnitTestLocalEventBus.cs +++ /dev/null @@ -1,62 +0,0 @@ -using System; -using System.Threading.Tasks; -using JetBrains.Annotations; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using Volo.Abp.EventBus.Local; -using Volo.Abp.MultiTenancy; -using Volo.Abp.Uow; - -namespace Volo.Abp.EventBus.Distributed; - -/// -/// This class is used in unit tests and supports to publish DistributedEventSent and DistributedEventReceived events. -/// -public class UnitTestLocalEventBus : LocalEventBus -{ - public UnitTestLocalEventBus( - [NotNull] IOptions options, - [NotNull] IServiceScopeFactory serviceScopeFactory, - [NotNull] ICurrentTenant currentTenant, - [NotNull] IUnitOfWorkManager unitOfWorkManager, - [NotNull] IEventHandlerInvoker eventHandlerInvoker) - : base(options, serviceScopeFactory, currentTenant, unitOfWorkManager, eventHandlerInvoker) - { - } - - public Func OnEventHandleInvoking { get; set; } - - protected async override Task InvokeEventHandlerAsync(IEventHandler eventHandler, object eventData, Type eventType) - { - if (OnEventHandleInvoking != null && eventType != typeof(DistributedEventSent) && eventType != typeof(DistributedEventReceived)) - { - await OnEventHandleInvoking(eventType, eventData); - } - - await base.InvokeEventHandlerAsync(eventHandler, eventData, eventType); - } - - public Func OnPublishing { get; set; } - - public async override Task PublishAsync( - Type eventType, - object eventData, - bool onUnitOfWorkComplete = true) - { - if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null) - { - AddToUnitOfWork( - UnitOfWorkManager.Current, - new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext()) - ); - return; - } - - if (OnPublishing != null && eventType != typeof(DistributedEventSent) && eventType != typeof(DistributedEventReceived)) - { - await OnPublishing(eventType, eventData); - } - - await PublishToEventBusAsync(eventType, eventData); - } -}