diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index ac1e8c6565..3668193c9b 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -43,7 +43,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB CorrelationIdProvider = correlationIdProvider; } - public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class + public virtual IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class { return Subscribe(typeof(TEvent), handler); } @@ -53,7 +53,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB return PublishAsync(eventType, eventData, onUnitOfWorkComplete, useOutbox: true); } - public Task PublishAsync( + public virtual Task PublishAsync( TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) @@ -154,7 +154,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB return Task.CompletedTask; } - protected async Task AddToInboxAsync( + protected virtual async Task AddToInboxAsync( string? messageId, string eventName, Type eventType, @@ -181,6 +181,9 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB { if (await eventInbox.ExistsByMessageIdAsync(messageId!)) { + // Message already exists in the inbox, no need to add again. + // This can happen in case of retries from the sender side. + addToInbox = true; continue; } }