From d7dd915d767150d61402ac67becc6c5599988a58 Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 19 Aug 2025 14:22:32 +0800 Subject: [PATCH 01/11] Add error handling to InboxProcessor event processing Wrapped event processing in a try-catch block to log errors when processing individual events fails. This prevents one failed event from interrupting the processing of subsequent events and improves reliability. --- .../EventBus/Distributed/InboxProcessor.cs | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index 06014701a2..f0d86627ae 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -103,18 +103,25 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency foreach (var waitingEvent in waitingEvents) { - using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) + try { - await DistributedEventBus - .AsSupportsEventBoxes() - .ProcessFromInboxAsync(waitingEvent, InboxConfig); + using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) + { + await DistributedEventBus + .AsSupportsEventBoxes() + .ProcessFromInboxAsync(waitingEvent, InboxConfig); - await Inbox.MarkAsProcessedAsync(waitingEvent.Id); + await Inbox.MarkAsProcessedAsync(waitingEvent.Id); - await uow.CompleteAsync(StoppingToken); - } + await uow.CompleteAsync(StoppingToken); + } - Logger.LogInformation($"Processed the incoming event with id = {waitingEvent.Id:N}"); + Logger.LogInformation($"Processed the incoming event with id = {waitingEvent.Id:N}"); + } + catch (Exception e) + { + Logger.LogError(e, $"An error occurred while processing the incoming event with id = {waitingEvent.Id:N}"); + } } } } From 1292074763c91e943afb155cef048fc527758600 Mon Sep 17 00:00:00 2001 From: maliming Date: Thu, 21 Aug 2025 18:29:11 +0800 Subject: [PATCH 02/11] Added retry and discard feature. --- .../DistributedEvents/DbContextEventInbox.cs | 24 ++++++- ...entInboxDbContextModelBuilderExtensions.cs | 2 +- .../DistributedEvents/IncomingEventRecord.cs | 25 ++++++-- .../Abp/EventBus/Distributed/IEventInbox.cs | 4 ++ .../EventBus/Distributed/IncomingEventInfo.cs | 9 +++ .../Distributed/IncomingEventStatus.cs | 10 +++ .../EventBus/Distributed/InboxProcessor.cs | 63 ++++++++++++++++++- .../InboxProcessorFailurePolicy.cs | 21 +++++++ .../Distributed/InboxProcessorOptions.cs | 8 +++ .../DistributedEvents/IncomingEventRecord.cs | 28 +++++++-- .../MongoDbContextEventInbox.cs | 44 +++++++++++-- 11 files changed, 219 insertions(+), 19 deletions(-) create mode 100644 framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs create mode 100644 framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs create mode 100644 framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 54ddfd0e5e..9a5ac3f154 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -50,7 +50,7 @@ public class DbContextEventInbox : IDbContextEventInbox var outgoingEventRecords = await dbContext .IncomingEvents .AsNoTracking() - .Where(x => !x.Processed) + .Where(x => x.Status == IncomingEventStatus.Pending) .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) @@ -66,7 +66,25 @@ public class DbContextEventInbox : IDbContextEventInbox { var dbContext = await DbContextProvider.GetDbContextAsync(); await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => - x.SetProperty(p => p.Processed, _ => true).SetProperty(p => p.ProcessedTime, _ => Clock.Now)); + x.SetProperty(p => p.Status, _ => IncomingEventStatus.Processed).SetProperty(p => p.DiscardedOrProcessedTime, _ => Clock.Now)); + } + + [UnitOfWork] + public virtual async Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => + x.SetProperty(p => p.RetryCount, _ => retryCount) + .SetProperty(p => p.NextRetryTime, _ => nextRetryTime) + .SetProperty(p => p.Status, _ => IncomingEventStatus.Pending)); + } + + [UnitOfWork] + public virtual async Task MarkAsDiscardAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => + x.SetProperty(p => p.Status, _ => IncomingEventStatus.Discarded).SetProperty(p => p.DiscardedOrProcessedTime, _ => Clock.Now)); } [UnitOfWork] @@ -82,7 +100,7 @@ public class DbContextEventInbox : IDbContextEventInbox var dbContext = await DbContextProvider.GetDbContextAsync(); var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; await dbContext.IncomingEvents - .Where(x => x.Processed && x.CreationTime < timeToKeepEvents) + .Where(x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents) .ExecuteDeleteAsync(); } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs index fbe91c2def..b7474c3b11 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs @@ -16,7 +16,7 @@ public static class EventInboxDbContextModelBuilderExtensions b.Property(x => x.EventName).IsRequired().HasMaxLength(IncomingEventRecord.MaxEventNameLength); b.Property(x => x.EventData).IsRequired(); - b.HasIndex(x => new { x.Processed, x.CreationTime }); + b.HasIndex(x => new { x.Status, x.CreationTime }); b.HasIndex(x => x.MessageId); }); } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs index be7da15890..4f01dfbe37 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs @@ -24,9 +24,13 @@ public class IncomingEventRecord : public DateTime CreationTime { get; private set; } - public bool Processed { get; set; } + public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending; - public DateTime? ProcessedTime { get; set; } + public DateTime? DiscardedOrProcessedTime { get; set; } + + public int RetryCount { get; set; } = 0; + + public DateTime? NextRetryTime { get; set; } = null; protected IncomingEventRecord() { @@ -71,7 +75,20 @@ public class IncomingEventRecord : public void MarkAsProcessed(DateTime processedTime) { - Processed = true; - ProcessedTime = processedTime; + Status = IncomingEventStatus.Processed; + DiscardedOrProcessedTime = processedTime; + } + + public void MarkAsDiscarded(DateTime discardedTime) + { + Status = IncomingEventStatus.Discarded; + DiscardedOrProcessedTime = discardedTime; + } + + public void RetryLater(int retryCount, DateTime nextRetryTime) + { + Status = IncomingEventStatus.Pending; + NextRetryTime = nextRetryTime; + RetryCount = retryCount; } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs index c154330495..66b605d576 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs @@ -14,6 +14,10 @@ public interface IEventInbox Task MarkAsProcessedAsync(Guid id); + Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime); + + Task MarkAsDiscardAsync(Guid id); + Task ExistsByMessageIdAsync(string messageId); Task DeleteOldEventsAsync(); diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs index 1be24a3d02..855d4a7302 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs @@ -20,6 +20,14 @@ public class IncomingEventInfo : IIncomingEventInfo public DateTime CreationTime { get; } + public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending; + + public DateTime? DiscardedOrProcessedTime { get; set; } + + public int RetryCount { get; set; } = 0; + + public DateTime? NextRetryTime { get; set; } = null; + protected IncomingEventInfo() { ExtraProperties = new ExtraPropertyDictionary(); @@ -38,6 +46,7 @@ public class IncomingEventInfo : IIncomingEventInfo EventName = Check.NotNullOrWhiteSpace(eventName, nameof(eventName), MaxEventNameLength); EventData = eventData; CreationTime = creationTime; + ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs new file mode 100644 index 0000000000..27a210770c --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs @@ -0,0 +1,10 @@ +namespace Volo.Abp.EventBus.Distributed; + +public enum IncomingEventStatus +{ + Pending = 0, + + Discarded = 1, + + Processed = 2 +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index f0d86627ae..bbd203011a 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -25,7 +25,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency protected IEventInbox Inbox { get; private set; } = default!; protected InboxConfig InboxConfig { get; private set; } = default!; protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } - + protected InboxProcessorOptions InboxProcessorOptions { get; set; } protected DateTime? LastCleanTime { get; set; } protected string DistributedLockName { get; set; } = default!; @@ -40,7 +40,8 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency IAbpDistributedLock distributedLock, IUnitOfWorkManager unitOfWorkManager, IClock clock, - IOptions eventBusBoxesOptions) + IOptions eventBusBoxesOptions, + IOptions inboxProcessorOptions) { ServiceProvider = serviceProvider; Timer = timer; @@ -49,6 +50,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency UnitOfWorkManager = unitOfWorkManager; Clock = clock; EventBusBoxesOptions = eventBusBoxesOptions.Value; + InboxProcessorOptions = inboxProcessorOptions.Value; Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.TotalMilliseconds); Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; @@ -103,6 +105,12 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency foreach (var waitingEvent in waitingEvents) { + if (waitingEvent.NextRetryTime.HasValue && waitingEvent.NextRetryTime.Value > Clock.Now) + { + Logger.LogInformation($"Event with id = {waitingEvent.Id:N} is not ready to be processed yet. Next retry time: {waitingEvent.NextRetryTime.Value}"); + continue; + } + try { using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) @@ -121,6 +129,45 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency catch (Exception e) { Logger.LogError(e, $"An error occurred while processing the incoming event with id = {waitingEvent.Id:N}"); + + if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Retry) + { + throw; + } + + if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.RetryLater) + { + using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) + { + if (waitingEvent.RetryCount >= InboxProcessorOptions.MaxRetryCount) + { + Logger.LogWarning($"Max retry count reached for event with id = {waitingEvent.Id:N}. Discarding the event."); + + await Inbox.MarkAsDiscardAsync(waitingEvent.Id); + await uow.CompleteAsync(StoppingToken); + continue; + } + + Logger.LogInformation($"Retrying event with id = {waitingEvent.Id:N}. " + + $"Retry count: {waitingEvent.RetryCount}, " + + $"Next retry time: {GetNextRetryTime(waitingEvent.RetryCount, InboxProcessorOptions.MaxRetryCount)}"); + + await Inbox.RetryLaterAsync(waitingEvent.Id, waitingEvent.RetryCount++, GetNextRetryTime(waitingEvent.RetryCount, InboxProcessorOptions.MaxRetryCount)); + await uow.CompleteAsync(StoppingToken); + } + continue; + } + + if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Discard) + { + using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) + { + Logger.LogInformation($"Discarding event with id = {waitingEvent.Id:N} due to an error."); + await Inbox.MarkAsDiscardAsync(waitingEvent.Id); + await uow.CompleteAsync(StoppingToken); + } + continue; + } } } } @@ -137,6 +184,18 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency } } + protected virtual DateTime? GetNextRetryTime(int retryCount, int maxRetryCount) + { + if (retryCount > maxRetryCount) + { + return null; + } + + var delaySeconds = 1 * (int)Math.Pow(2, retryCount - 1); + + return DateTime.Now.AddSeconds(delaySeconds); + } + protected virtual async Task> GetWaitingEventsAsync() { return await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, EventBusBoxesOptions.InboxProcessorFilter, StoppingToken); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs new file mode 100644 index 0000000000..371fb00548 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs @@ -0,0 +1,21 @@ +namespace Volo.Abp.EventBus.Distributed; + +public enum InboxProcessorFailurePolicy +{ + /// + /// Default behavior, retry the following event in next period time. + /// + Retry, + + /// + /// Skip and retry the event in next period time, but with a delay. + /// The delay increases in every fail, and it is discarded after a specified amount of time + /// (e.g. 1 second, 2 seconds, 4 seconds, 8 seconds, etc.), + /// + RetryLater, + + /// + /// Skip the event and do not retry it. + /// + Discard, +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs new file mode 100644 index 0000000000..f3972fd1b3 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EventBus.Distributed; + +public class InboxProcessorOptions +{ + public InboxProcessorFailurePolicy FailurePolicy { get; set; } = InboxProcessorFailurePolicy.Retry; + + public int MaxRetryCount { get; set; } = 10; +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs index a48492d1da..9ef7c87167 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs @@ -23,12 +23,17 @@ public class IncomingEventRecord : public DateTime CreationTime { get; private set; } - public bool Processed { get; set; } + public IncomingEventStatus Status { get; set; } - public DateTime? ProcessedTime { get; set; } + public DateTime? DiscardedOrProcessedTime { get; set; } + + public int RetryCount { get; set; } = 0; + + public DateTime? NextRetryTime { get; set; } = null; protected IncomingEventRecord() { + Status = IncomingEventStatus.Pending; ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } @@ -41,7 +46,7 @@ public class IncomingEventRecord : EventName = eventInfo.EventName; EventData = eventInfo.EventData; CreationTime = eventInfo.CreationTime; - + Status = IncomingEventStatus.Pending; ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); foreach (var property in eventInfo.ExtraProperties) @@ -70,7 +75,20 @@ public class IncomingEventRecord : public void MarkAsProcessed(DateTime processedTime) { - Processed = true; - ProcessedTime = processedTime; + Status = IncomingEventStatus.Processed; + DiscardedOrProcessedTime = processedTime; + } + + public void MarkAsDiscarded(DateTime discardedTime) + { + Status = IncomingEventStatus.Discarded; + DiscardedOrProcessedTime = discardedTime; + } + + public void RetryLater(int retryCount, DateTime nextRetryTime) + { + Status = IncomingEventStatus.Pending; + NextRetryTime = nextRetryTime; + RetryCount = retryCount; } } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index e5d726526b..1925f8280a 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -64,7 +64,7 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb var outgoingEventRecords = await dbContext .IncomingEvents .AsQueryable() - .Where(x => x.Processed == false) + .Where(x => x.Status == IncomingEventStatus.Pending) .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) @@ -81,7 +81,43 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb var dbContext = await DbContextProvider.GetDbContextAsync(); var filter = Builders.Filter.Eq(x => x.Id, id); - var update = Builders.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now); + var update = Builders.Update.Set(x => x.Status == IncomingEventStatus.Processed, true).Set(x => x.DiscardedOrProcessedTime, Clock.Now); + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update); + } + else + { + await dbContext.IncomingEvents.UpdateOneAsync(filter, update); + } + } + + [UnitOfWork] + public virtual async Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + + var filter = Builders.Filter.Eq(x => x.Id, id); + var update = Builders.Update.Set(x => x.RetryCount, retryCount).Set(x => x.NextRetryTime, nextRetryTime); + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update); + } + else + { + await dbContext.IncomingEvents.UpdateOneAsync(filter, update); + } + } + + [UnitOfWork] + public virtual async Task MarkAsDiscardAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + + var filter = Builders.Filter.Eq(x => x.Id, id); + var update = Builders.Update.Set(x => x.Status == IncomingEventStatus.Discarded, true).Set(x => x.DiscardedOrProcessedTime, Clock.Now); if (dbContext.SessionHandle != null) { @@ -108,11 +144,11 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb if (dbContext.SessionHandle != null) { - await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => x.Processed && x.CreationTime < timeToKeepEvents); + await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents); } else { - await dbContext.IncomingEvents.DeleteManyAsync(x => x.Processed && x.CreationTime < timeToKeepEvents); + await dbContext.IncomingEvents.DeleteManyAsync(x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents); } } } From 7ae4324808facb27109cf1e84d3dba895c5b1147 Mon Sep 17 00:00:00 2001 From: maliming Date: Fri, 22 Aug 2025 13:58:45 +0800 Subject: [PATCH 03/11] Refactor event handling to use HandledTime property --- .../DistributedEvents/DbContextEventInbox.cs | 4 ++-- .../DistributedEvents/IncomingEventRecord.cs | 12 +++++++---- .../EventBus/Distributed/IncomingEventInfo.cs | 13 +++++++++--- .../EventBus/Distributed/InboxProcessor.cs | 5 ++++- .../Distributed/InboxProcessorOptions.cs | 7 +++++++ .../DistributedEvents/IncomingEventRecord.cs | 20 ++++++++++++------- .../MongoDbContextEventInbox.cs | 4 ++-- 7 files changed, 46 insertions(+), 19 deletions(-) diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 9a5ac3f154..856b8612be 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -66,7 +66,7 @@ public class DbContextEventInbox : IDbContextEventInbox { var dbContext = await DbContextProvider.GetDbContextAsync(); await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => - x.SetProperty(p => p.Status, _ => IncomingEventStatus.Processed).SetProperty(p => p.DiscardedOrProcessedTime, _ => Clock.Now)); + x.SetProperty(p => p.Status, _ => IncomingEventStatus.Processed).SetProperty(p => p.HandledTime, _ => Clock.Now)); } [UnitOfWork] @@ -84,7 +84,7 @@ public class DbContextEventInbox : IDbContextEventInbox { var dbContext = await DbContextProvider.GetDbContextAsync(); await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => - x.SetProperty(p => p.Status, _ => IncomingEventStatus.Discarded).SetProperty(p => p.DiscardedOrProcessedTime, _ => Clock.Now)); + x.SetProperty(p => p.Status, _ => IncomingEventStatus.Discarded).SetProperty(p => p.HandledTime, _ => Clock.Now)); } [UnitOfWork] diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs index 4f01dfbe37..3f06bf4ef9 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs @@ -26,7 +26,7 @@ public class IncomingEventRecord : public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending; - public DateTime? DiscardedOrProcessedTime { get; set; } + public DateTime? HandledTime { get; set; } public int RetryCount { get; set; } = 0; @@ -62,7 +62,11 @@ public class IncomingEventRecord : MessageId, EventName, EventData, - CreationTime + CreationTime, + Status, + HandledTime, + RetryCount, + NextRetryTime ); foreach (var property in ExtraProperties) @@ -76,13 +80,13 @@ public class IncomingEventRecord : public void MarkAsProcessed(DateTime processedTime) { Status = IncomingEventStatus.Processed; - DiscardedOrProcessedTime = processedTime; + HandledTime = processedTime; } public void MarkAsDiscarded(DateTime discardedTime) { Status = IncomingEventStatus.Discarded; - DiscardedOrProcessedTime = discardedTime; + HandledTime = discardedTime; } public void RetryLater(int retryCount, DateTime nextRetryTime) diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs index 855d4a7302..23e59ad61a 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs @@ -22,7 +22,7 @@ public class IncomingEventInfo : IIncomingEventInfo public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending; - public DateTime? DiscardedOrProcessedTime { get; set; } + public DateTime? HandledTime { get; set; } public int RetryCount { get; set; } = 0; @@ -39,14 +39,21 @@ public class IncomingEventInfo : IIncomingEventInfo string messageId, string eventName, byte[] eventData, - DateTime creationTime) + DateTime creationTime, + IncomingEventStatus status = IncomingEventStatus.Pending, + DateTime? handledTime = null, + int retryCount = 0, + DateTime? nextRetryTime = null) { Id = id; MessageId = messageId; EventName = Check.NotNullOrWhiteSpace(eventName, nameof(eventName), MaxEventNameLength); EventData = eventData; CreationTime = creationTime; - + Status = status; + HandledTime = handledTime; + RetryCount = retryCount; + NextRetryTime = nextRetryTime; ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index bbd203011a..88f9e18a29 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -105,6 +105,8 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency foreach (var waitingEvent in waitingEvents) { + Logger.LogInformation($"Start processing the incoming event with id = {waitingEvent.Id:N}"); + if (waitingEvent.NextRetryTime.HasValue && waitingEvent.NextRetryTime.Value > Clock.Now) { Logger.LogInformation($"Event with id = {waitingEvent.Id:N} is not ready to be processed yet. Next retry time: {waitingEvent.NextRetryTime.Value}"); @@ -139,7 +141,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency { using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) { - if (waitingEvent.RetryCount >= InboxProcessorOptions.MaxRetryCount) + if (waitingEvent.RetryCount > InboxProcessorOptions.MaxRetryCount) { Logger.LogWarning($"Max retry count reached for event with id = {waitingEvent.Id:N}. Discarding the event."); @@ -163,6 +165,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) { Logger.LogInformation($"Discarding event with id = {waitingEvent.Id:N} due to an error."); + await Inbox.MarkAsDiscardAsync(waitingEvent.Id); await uow.CompleteAsync(StoppingToken); } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs index f3972fd1b3..7767945208 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs @@ -4,5 +4,12 @@ public class InboxProcessorOptions { public InboxProcessorFailurePolicy FailurePolicy { get; set; } = InboxProcessorFailurePolicy.Retry; + /// + /// Retry intervals follow an exponential backoff strategy: + /// The delay for each retry is twice the previous one, + /// starting with an initial delay of 1 second. + /// For example: 1s, 2s, 4s, 8s, 16s, 32s ... + /// Maximum of 10 retries. + /// public int MaxRetryCount { get; set; } = 10; } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs index 9ef7c87167..0a3a1d4dd2 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs @@ -23,9 +23,9 @@ public class IncomingEventRecord : public DateTime CreationTime { get; private set; } - public IncomingEventStatus Status { get; set; } + public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending; - public DateTime? DiscardedOrProcessedTime { get; set; } + public DateTime? HandledTime { get; set; } public int RetryCount { get; set; } = 0; @@ -33,7 +33,6 @@ public class IncomingEventRecord : protected IncomingEventRecord() { - Status = IncomingEventStatus.Pending; ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } @@ -46,7 +45,10 @@ public class IncomingEventRecord : EventName = eventInfo.EventName; EventData = eventInfo.EventData; CreationTime = eventInfo.CreationTime; - Status = IncomingEventStatus.Pending; + Status = eventInfo.Status; + HandledTime = eventInfo.HandledTime; + RetryCount = eventInfo.RetryCount; + NextRetryTime = eventInfo.NextRetryTime; ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); foreach (var property in eventInfo.ExtraProperties) @@ -62,7 +64,11 @@ public class IncomingEventRecord : MessageId, EventName, EventData, - CreationTime + CreationTime, + Status, + HandledTime, + RetryCount, + NextRetryTime ); foreach (var property in ExtraProperties) @@ -76,13 +82,13 @@ public class IncomingEventRecord : public void MarkAsProcessed(DateTime processedTime) { Status = IncomingEventStatus.Processed; - DiscardedOrProcessedTime = processedTime; + HandledTime = processedTime; } public void MarkAsDiscarded(DateTime discardedTime) { Status = IncomingEventStatus.Discarded; - DiscardedOrProcessedTime = discardedTime; + HandledTime = discardedTime; } public void RetryLater(int retryCount, DateTime nextRetryTime) diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index 1925f8280a..112f6d7cf7 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -81,7 +81,7 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb var dbContext = await DbContextProvider.GetDbContextAsync(); var filter = Builders.Filter.Eq(x => x.Id, id); - var update = Builders.Update.Set(x => x.Status == IncomingEventStatus.Processed, true).Set(x => x.DiscardedOrProcessedTime, Clock.Now); + var update = Builders.Update.Set(x => x.Status, IncomingEventStatus.Processed).Set(x => x.HandledTime, Clock.Now); if (dbContext.SessionHandle != null) { @@ -117,7 +117,7 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb var dbContext = await DbContextProvider.GetDbContextAsync(); var filter = Builders.Filter.Eq(x => x.Id, id); - var update = Builders.Update.Set(x => x.Status == IncomingEventStatus.Discarded, true).Set(x => x.DiscardedOrProcessedTime, Clock.Now); + var update = Builders.Update.Set(x => x.Status, IncomingEventStatus.Discarded).Set(x => x.HandledTime, Clock.Now); if (dbContext.SessionHandle != null) { From 79d62ccad964482f2c5599c2ce066f494a70a556 Mon Sep 17 00:00:00 2001 From: maliming Date: Fri, 22 Aug 2025 15:06:49 +0800 Subject: [PATCH 04/11] Rename InboxProcessorOptions to AbpInboxProcessorOptions --- ...rOptions.cs => AbpInboxProcessorOptions.cs} | 2 +- .../Abp/EventBus/Distributed/InboxProcessor.cs | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) rename framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/{InboxProcessorOptions.cs => AbpInboxProcessorOptions.cs} (92%) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpInboxProcessorOptions.cs similarity index 92% rename from framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs rename to framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpInboxProcessorOptions.cs index 7767945208..32b209a94d 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpInboxProcessorOptions.cs @@ -1,6 +1,6 @@ namespace Volo.Abp.EventBus.Distributed; -public class InboxProcessorOptions +public class AbpInboxProcessorOptions { public InboxProcessorFailurePolicy FailurePolicy { get; set; } = InboxProcessorFailurePolicy.Retry; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index 88f9e18a29..df137725b8 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -25,7 +25,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency protected IEventInbox Inbox { get; private set; } = default!; protected InboxConfig InboxConfig { get; private set; } = default!; protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } - protected InboxProcessorOptions InboxProcessorOptions { get; set; } + protected AbpInboxProcessorOptions AbpInboxProcessorOptions { get; set; } protected DateTime? LastCleanTime { get; set; } protected string DistributedLockName { get; set; } = default!; @@ -41,7 +41,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency IUnitOfWorkManager unitOfWorkManager, IClock clock, IOptions eventBusBoxesOptions, - IOptions inboxProcessorOptions) + IOptions inboxProcessorOptions) { ServiceProvider = serviceProvider; Timer = timer; @@ -50,7 +50,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency UnitOfWorkManager = unitOfWorkManager; Clock = clock; EventBusBoxesOptions = eventBusBoxesOptions.Value; - InboxProcessorOptions = inboxProcessorOptions.Value; + AbpInboxProcessorOptions = inboxProcessorOptions.Value; Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.TotalMilliseconds); Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; @@ -132,16 +132,16 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency { Logger.LogError(e, $"An error occurred while processing the incoming event with id = {waitingEvent.Id:N}"); - if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Retry) + if (AbpInboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Retry) { throw; } - if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.RetryLater) + if (AbpInboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.RetryLater) { using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) { - if (waitingEvent.RetryCount > InboxProcessorOptions.MaxRetryCount) + if (waitingEvent.RetryCount > AbpInboxProcessorOptions.MaxRetryCount) { Logger.LogWarning($"Max retry count reached for event with id = {waitingEvent.Id:N}. Discarding the event."); @@ -152,15 +152,15 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency Logger.LogInformation($"Retrying event with id = {waitingEvent.Id:N}. " + $"Retry count: {waitingEvent.RetryCount}, " + - $"Next retry time: {GetNextRetryTime(waitingEvent.RetryCount, InboxProcessorOptions.MaxRetryCount)}"); + $"Next retry time: {GetNextRetryTime(waitingEvent.RetryCount, AbpInboxProcessorOptions.MaxRetryCount)}"); - await Inbox.RetryLaterAsync(waitingEvent.Id, waitingEvent.RetryCount++, GetNextRetryTime(waitingEvent.RetryCount, InboxProcessorOptions.MaxRetryCount)); + await Inbox.RetryLaterAsync(waitingEvent.Id, ++waitingEvent.RetryCount, GetNextRetryTime(waitingEvent.RetryCount, AbpInboxProcessorOptions.MaxRetryCount)); await uow.CompleteAsync(StoppingToken); } continue; } - if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Discard) + if (AbpInboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Discard) { using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) { From 8e065517915dc90d86aaec0eb80880c16520e38d Mon Sep 17 00:00:00 2001 From: maliming Date: Fri, 22 Aug 2025 15:37:03 +0800 Subject: [PATCH 05/11] Refactor inbox retry policy configuration --- .../DistributedEvents/DbContextEventInbox.cs | 1 + .../Distributed/AbpEventBusBoxesOptions.cs | 10 +++++ .../Distributed/AbpInboxProcessorOptions.cs | 15 ------- .../EventBus/Distributed/InboxProcessor.cs | 43 +++++++------------ .../MongoDbContextEventInbox.cs | 1 + 5 files changed, 27 insertions(+), 43 deletions(-) delete mode 100644 framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpInboxProcessorOptions.cs diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 856b8612be..2e148a97c6 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -51,6 +51,7 @@ public class DbContextEventInbox : IDbContextEventInbox .IncomingEvents .AsNoTracking() .Where(x => x.Status == IncomingEventStatus.Pending) + .Where(x => x.NextRetryTime == null || x.NextRetryTime <= Clock.Now) .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs index cc91cad5df..ec7b603176 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs @@ -36,6 +36,16 @@ public class AbpEventBusBoxesOptions /// public TimeSpan PeriodTimeSpan { get; set; } + /// + /// Default: + /// + public InboxProcessorFailurePolicy InboxProcessorFailurePolicy { get; set; } = InboxProcessorFailurePolicy.Retry; + + /// + /// Default: 10 + /// + public int InboxProcessorMaxRetryCount { get; set; } = 10; + /// /// Default: 15 seconds /// diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpInboxProcessorOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpInboxProcessorOptions.cs deleted file mode 100644 index 32b209a94d..0000000000 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpInboxProcessorOptions.cs +++ /dev/null @@ -1,15 +0,0 @@ -namespace Volo.Abp.EventBus.Distributed; - -public class AbpInboxProcessorOptions -{ - public InboxProcessorFailurePolicy FailurePolicy { get; set; } = InboxProcessorFailurePolicy.Retry; - - /// - /// Retry intervals follow an exponential backoff strategy: - /// The delay for each retry is twice the previous one, - /// starting with an initial delay of 1 second. - /// For example: 1s, 2s, 4s, 8s, 16s, 32s ... - /// Maximum of 10 retries. - /// - public int MaxRetryCount { get; set; } = 10; -} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index df137725b8..3b61527bc5 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; @@ -25,7 +26,6 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency protected IEventInbox Inbox { get; private set; } = default!; protected InboxConfig InboxConfig { get; private set; } = default!; protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } - protected AbpInboxProcessorOptions AbpInboxProcessorOptions { get; set; } protected DateTime? LastCleanTime { get; set; } protected string DistributedLockName { get; set; } = default!; @@ -40,8 +40,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency IAbpDistributedLock distributedLock, IUnitOfWorkManager unitOfWorkManager, IClock clock, - IOptions eventBusBoxesOptions, - IOptions inboxProcessorOptions) + IOptions eventBusBoxesOptions) { ServiceProvider = serviceProvider; Timer = timer; @@ -50,7 +49,6 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency UnitOfWorkManager = unitOfWorkManager; Clock = clock; EventBusBoxesOptions = eventBusBoxesOptions.Value; - AbpInboxProcessorOptions = inboxProcessorOptions.Value; Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.TotalMilliseconds); Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; @@ -107,12 +105,6 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency { Logger.LogInformation($"Start processing the incoming event with id = {waitingEvent.Id:N}"); - if (waitingEvent.NextRetryTime.HasValue && waitingEvent.NextRetryTime.Value > Clock.Now) - { - Logger.LogInformation($"Event with id = {waitingEvent.Id:N} is not ready to be processed yet. Next retry time: {waitingEvent.NextRetryTime.Value}"); - continue; - } - try { using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) @@ -130,41 +122,42 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency } catch (Exception e) { - Logger.LogError(e, $"An error occurred while processing the incoming event with id = {waitingEvent.Id:N}"); + Logger.LogError(e, $"Event with id = {waitingEvent.Id:N} processing failed."); - if (AbpInboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Retry) + if (EventBusBoxesOptions.InboxProcessorFailurePolicy == InboxProcessorFailurePolicy.Retry) { throw; } - if (AbpInboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.RetryLater) + if (EventBusBoxesOptions.InboxProcessorFailurePolicy == InboxProcessorFailurePolicy.RetryLater) { using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) { - if (waitingEvent.RetryCount > AbpInboxProcessorOptions.MaxRetryCount) + if (waitingEvent.RetryCount > EventBusBoxesOptions.InboxProcessorMaxRetryCount) { - Logger.LogWarning($"Max retry count reached for event with id = {waitingEvent.Id:N}. Discarding the event."); + Logger.LogWarning($"Event with id = {waitingEvent.Id:N} has exceeded the maximum retry count. Marking it as discarded."); await Inbox.MarkAsDiscardAsync(waitingEvent.Id); await uow.CompleteAsync(StoppingToken); continue; } - Logger.LogInformation($"Retrying event with id = {waitingEvent.Id:N}. " + - $"Retry count: {waitingEvent.RetryCount}, " + - $"Next retry time: {GetNextRetryTime(waitingEvent.RetryCount, AbpInboxProcessorOptions.MaxRetryCount)}"); + Logger.LogInformation($"Event with id = {waitingEvent.Id:N} will retry later. " + + $"Current retry count: {waitingEvent.RetryCount}, " + + $"Next retry time: {GetNextRetryTime(waitingEvent.RetryCount)}" + + $"Max retry count: {EventBusBoxesOptions.InboxProcessorMaxRetryCount}."); - await Inbox.RetryLaterAsync(waitingEvent.Id, ++waitingEvent.RetryCount, GetNextRetryTime(waitingEvent.RetryCount, AbpInboxProcessorOptions.MaxRetryCount)); + await Inbox.RetryLaterAsync(waitingEvent.Id, ++waitingEvent.RetryCount, GetNextRetryTime(waitingEvent.RetryCount)); await uow.CompleteAsync(StoppingToken); } continue; } - if (AbpInboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Discard) + if (EventBusBoxesOptions.InboxProcessorFailurePolicy == InboxProcessorFailurePolicy.Discard) { using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) { - Logger.LogInformation($"Discarding event with id = {waitingEvent.Id:N} due to an error."); + Logger.LogInformation($"Event with id = {waitingEvent.Id:N} will be discarded."); await Inbox.MarkAsDiscardAsync(waitingEvent.Id); await uow.CompleteAsync(StoppingToken); @@ -187,15 +180,9 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency } } - protected virtual DateTime? GetNextRetryTime(int retryCount, int maxRetryCount) + protected virtual DateTime? GetNextRetryTime(int retryCount) { - if (retryCount > maxRetryCount) - { - return null; - } - var delaySeconds = 1 * (int)Math.Pow(2, retryCount - 1); - return DateTime.Now.AddSeconds(delaySeconds); } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index 112f6d7cf7..427c794542 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -65,6 +65,7 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb .IncomingEvents .AsQueryable() .Where(x => x.Status == IncomingEventStatus.Pending) + .Where(x => x.NextRetryTime == null || x.NextRetryTime <= Clock.Now) .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) From 2f3520ab0329c318048e66070af017a6966c9ceb Mon Sep 17 00:00:00 2001 From: maliming Date: Fri, 22 Aug 2025 15:40:49 +0800 Subject: [PATCH 06/11] Fix log message formatting in InboxProcessor --- .../Volo/Abp/EventBus/Distributed/InboxProcessor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index 3b61527bc5..e4fa8a0b12 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -144,7 +144,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency Logger.LogInformation($"Event with id = {waitingEvent.Id:N} will retry later. " + $"Current retry count: {waitingEvent.RetryCount}, " + - $"Next retry time: {GetNextRetryTime(waitingEvent.RetryCount)}" + + $"Next retry time: {GetNextRetryTime(waitingEvent.RetryCount)}, " + $"Max retry count: {EventBusBoxesOptions.InboxProcessorMaxRetryCount}."); await Inbox.RetryLaterAsync(waitingEvent.Id, ++waitingEvent.RetryCount, GetNextRetryTime(waitingEvent.RetryCount)); From fe455f098993d6b816e6b9593cf39db3c71b8983 Mon Sep 17 00:00:00 2001 From: maliming Date: Fri, 22 Aug 2025 16:21:20 +0800 Subject: [PATCH 07/11] Fix retry logic and delay calculation in InboxProcessor --- .../Abp/EventBus/Distributed/InboxProcessor.cs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index e4fa8a0b12..10251ffedb 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -133,21 +133,29 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency { using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) { - if (waitingEvent.RetryCount > EventBusBoxesOptions.InboxProcessorMaxRetryCount) + if (waitingEvent.NextRetryTime != null) + { + waitingEvent.RetryCount++; + } + + if (waitingEvent.RetryCount >= EventBusBoxesOptions.InboxProcessorMaxRetryCount) { Logger.LogWarning($"Event with id = {waitingEvent.Id:N} has exceeded the maximum retry count. Marking it as discarded."); + await Inbox.RetryLaterAsync(waitingEvent.Id, waitingEvent.RetryCount, null); await Inbox.MarkAsDiscardAsync(waitingEvent.Id); await uow.CompleteAsync(StoppingToken); continue; } + waitingEvent.NextRetryTime = GetNextRetryTime(waitingEvent.RetryCount); + Logger.LogInformation($"Event with id = {waitingEvent.Id:N} will retry later. " + $"Current retry count: {waitingEvent.RetryCount}, " + - $"Next retry time: {GetNextRetryTime(waitingEvent.RetryCount)}, " + + $"Next retry time: {waitingEvent.NextRetryTime}, " + $"Max retry count: {EventBusBoxesOptions.InboxProcessorMaxRetryCount}."); - await Inbox.RetryLaterAsync(waitingEvent.Id, ++waitingEvent.RetryCount, GetNextRetryTime(waitingEvent.RetryCount)); + await Inbox.RetryLaterAsync(waitingEvent.Id, waitingEvent.RetryCount, GetNextRetryTime(waitingEvent.RetryCount)); await uow.CompleteAsync(StoppingToken); } continue; @@ -182,7 +190,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency protected virtual DateTime? GetNextRetryTime(int retryCount) { - var delaySeconds = 1 * (int)Math.Pow(2, retryCount - 1); + var delaySeconds = (int)Math.Pow(2, retryCount); return DateTime.Now.AddSeconds(delaySeconds); } From 8948d29f52705ea8ab24dd64f58424e0d5c5351d Mon Sep 17 00:00:00 2001 From: maliming Date: Fri, 22 Aug 2025 17:07:40 +0800 Subject: [PATCH 08/11] Document inbox processor failure policy options --- .../framework/infrastructure/event-bus/distributed/index.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/en/framework/infrastructure/event-bus/distributed/index.md b/docs/en/framework/infrastructure/event-bus/distributed/index.md index d56215fa12..bef921c851 100644 --- a/docs/en/framework/infrastructure/event-bus/distributed/index.md +++ b/docs/en/framework/infrastructure/event-bus/distributed/index.md @@ -643,6 +643,11 @@ Configure(options => * `InboxWaitingEventMaxCount`: The maximum number of events to query at once from the inbox in the database. Default value is 1000. * `OutboxWaitingEventMaxCount`: The maximum number of events to query at once from the outbox in the database. Default value is 1000. * `DistributedLockWaitDuration`: ABP uses [distributed locking](../../distributed-locking.md) to prevent concurrent access to the inbox and outbox messages in the database, when running multiple instance of the same application. If an instance of the application can not obtain the lock, it tries after a duration. This is the configuration of that duration. Default value is 15 seconds (`TimeSpan.FromSeconds(15)`). +* `InboxProcessorFailurePolicy`: The policy to handle the failure of the inbox processor. Default value is `Retry`. Possible values are: + * `Retry`: The current exception and subsequent events will continue to be processed in order in the next cycle. + * `RetryLater`: Skip the event that caused the exception and continue with the following events. The failed event will be retried after a delay that doubles each time (1, 2, 4, 8, 16 seconds). The default maximum retry count is 10 (configurable). + * `Discard`: The event that caused the exception will be discarded and will not be retried. +* `InboxProcessorMaxRetryCount`: The maximum number of retries for the inbox processor. Default value is 10. Only used when `InboxProcessorFailurePolicy` is `RetryLater`. ### Skipping Outbox From 2f98f7fea08f04856a3e15fd0e267e596c805597 Mon Sep 17 00:00:00 2001 From: maliming Date: Thu, 4 Sep 2025 10:07:29 +0800 Subject: [PATCH 09/11] Add configurable retry backoff for inbox processor Introduces the InboxProcessorRetryBackoffFactor option to allow configuration of the initial retry delay for the inbox processor when using the RetryLater failure policy. Updates documentation and logic to use this factor for exponential backoff, providing more flexibility in handling event retries. --- .../infrastructure/event-bus/distributed/index.md | 4 ++-- .../Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs | 7 +++++++ .../Volo/Abp/EventBus/Distributed/InboxProcessor.cs | 8 ++++---- .../EventBus/Distributed/InboxProcessorFailurePolicy.cs | 7 ++++--- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/docs/en/framework/infrastructure/event-bus/distributed/index.md b/docs/en/framework/infrastructure/event-bus/distributed/index.md index bef921c851..f379a44f40 100644 --- a/docs/en/framework/infrastructure/event-bus/distributed/index.md +++ b/docs/en/framework/infrastructure/event-bus/distributed/index.md @@ -645,9 +645,9 @@ Configure(options => * `DistributedLockWaitDuration`: ABP uses [distributed locking](../../distributed-locking.md) to prevent concurrent access to the inbox and outbox messages in the database, when running multiple instance of the same application. If an instance of the application can not obtain the lock, it tries after a duration. This is the configuration of that duration. Default value is 15 seconds (`TimeSpan.FromSeconds(15)`). * `InboxProcessorFailurePolicy`: The policy to handle the failure of the inbox processor. Default value is `Retry`. Possible values are: * `Retry`: The current exception and subsequent events will continue to be processed in order in the next cycle. - * `RetryLater`: Skip the event that caused the exception and continue with the following events. The failed event will be retried after a delay that doubles each time (1, 2, 4, 8, 16 seconds). The default maximum retry count is 10 (configurable). + * `RetryLater`: Skip the event that caused the exception and continue with the following events. The failed event will be retried after a delay that doubles with each retry, starting from the configured `InboxProcessorRetryBackoffFactor` (e.g., 10, 20, 40, 80 seconds). The default maximum retry count is 10 (configurable). Discard the event if it still fails after reaching the maximum retry count. * `Discard`: The event that caused the exception will be discarded and will not be retried. -* `InboxProcessorMaxRetryCount`: The maximum number of retries for the inbox processor. Default value is 10. Only used when `InboxProcessorFailurePolicy` is `RetryLater`. +* `InboxProcessorRetryBackoffFactor`: The initial retry delay factor (double) used when `InboxProcessorFailurePolicy` is `RetryLater`. The retry delay is calculated as: `delay = InboxProcessorRetryBackoffFactor × 2^retryCount`. Default value is `10`. ### Skipping Outbox diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs index ec7b603176..6febf2f065 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs @@ -46,6 +46,13 @@ public class AbpEventBusBoxesOptions /// public int InboxProcessorMaxRetryCount { get; set; } = 10; + /// + /// Default value is 10 + /// The initial retry delay factor (double) when `InboxProcessorFailurePolicy` is `RetryLater`. + /// The delay is calculated as: `delay = InboxProcessorRetryBackoffFactor × 2^retryCount` + /// + public double InboxProcessorRetryBackoffFactor { get; set; } = 10; + /// /// Default: 15 seconds /// diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index 10251ffedb..b00ba81242 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -148,14 +148,14 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency continue; } - waitingEvent.NextRetryTime = GetNextRetryTime(waitingEvent.RetryCount); + waitingEvent.NextRetryTime = GetNextRetryTime(waitingEvent.RetryCount, EventBusBoxesOptions.InboxProcessorRetryBackoffFactor); Logger.LogInformation($"Event with id = {waitingEvent.Id:N} will retry later. " + $"Current retry count: {waitingEvent.RetryCount}, " + $"Next retry time: {waitingEvent.NextRetryTime}, " + $"Max retry count: {EventBusBoxesOptions.InboxProcessorMaxRetryCount}."); - await Inbox.RetryLaterAsync(waitingEvent.Id, waitingEvent.RetryCount, GetNextRetryTime(waitingEvent.RetryCount)); + await Inbox.RetryLaterAsync(waitingEvent.Id, waitingEvent.RetryCount, GetNextRetryTime(waitingEvent.RetryCount, EventBusBoxesOptions.InboxProcessorRetryBackoffFactor)); await uow.CompleteAsync(StoppingToken); } continue; @@ -188,9 +188,9 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency } } - protected virtual DateTime? GetNextRetryTime(int retryCount) + protected virtual DateTime? GetNextRetryTime(int retryCount, double factor) { - var delaySeconds = (int)Math.Pow(2, retryCount); + var delaySeconds = factor * Math.Pow(2, retryCount); return DateTime.Now.AddSeconds(delaySeconds); } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs index 371fb00548..31095b62a3 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs @@ -8,9 +8,10 @@ public enum InboxProcessorFailurePolicy Retry, /// - /// Skip and retry the event in next period time, but with a delay. - /// The delay increases in every fail, and it is discarded after a specified amount of time - /// (e.g. 1 second, 2 seconds, 4 seconds, 8 seconds, etc.), + /// Skip the failed event and retry it after a delay. + /// The delay doubles with each retry, starting from the configured InboxProcessorRetryBackoffFactor + /// (e.g., 10, 20, 40, 80 seconds, etc.). + /// The event is discarded if it still fails after reaching the maximum retry count. /// RetryLater, From 8a15eee94f2336983621b0c91e33b9ecfe540b1c Mon Sep 17 00:00:00 2001 From: maliming Date: Mon, 15 Sep 2025 16:23:48 +0800 Subject: [PATCH 10/11] Optimize Clock.Now usage in user delegation repositories --- .../EfCoreIdentityUserDelegationRepository.cs | 10 ++++++---- .../MongoDB/MongoIdentityUserDelegationRepository.cs | 8 +++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/modules/identity/src/Volo.Abp.Identity.EntityFrameworkCore/Volo/Abp/Identity/EntityFrameworkCore/EfCoreIdentityUserDelegationRepository.cs b/modules/identity/src/Volo.Abp.Identity.EntityFrameworkCore/Volo/Abp/Identity/EntityFrameworkCore/EfCoreIdentityUserDelegationRepository.cs index a21e0f8ea0..81953c54f4 100644 --- a/modules/identity/src/Volo.Abp.Identity.EntityFrameworkCore/Volo/Abp/Identity/EntityFrameworkCore/EfCoreIdentityUserDelegationRepository.cs +++ b/modules/identity/src/Volo.Abp.Identity.EntityFrameworkCore/Volo/Abp/Identity/EntityFrameworkCore/EfCoreIdentityUserDelegationRepository.cs @@ -31,22 +31,24 @@ public class EfCoreIdentityUserDelegationRepository : EfCoreRepository> GetActiveDelegationsAsync(Guid targetUserId, CancellationToken cancellationToken = default) { + var now = Clock.Now; return await (await GetDbSetAsync()) .AsNoTracking() .Where(x => x.TargetUserId == targetUserId && - x.StartTime <= Clock.Now && - x.EndTime >= Clock.Now) + x.StartTime <= now && + x.EndTime >= now) .ToListAsync(cancellationToken: cancellationToken); } public virtual async Task FindActiveDelegationByIdAsync(Guid id, CancellationToken cancellationToken = default) { + var now = Clock.Now; return await (await GetDbSetAsync()) .AsNoTracking() .FirstOrDefaultAsync(x => x.Id == id && - x.StartTime <= Clock.Now && - x.EndTime >= Clock.Now + x.StartTime <= now && + x.EndTime >= now , cancellationToken: GetCancellationToken(cancellationToken)); } } diff --git a/modules/identity/src/Volo.Abp.Identity.MongoDB/Volo/Abp/Identity/MongoDB/MongoIdentityUserDelegationRepository.cs b/modules/identity/src/Volo.Abp.Identity.MongoDB/Volo/Abp/Identity/MongoDB/MongoIdentityUserDelegationRepository.cs index 1855069b4a..bd96ed4164 100644 --- a/modules/identity/src/Volo.Abp.Identity.MongoDB/Volo/Abp/Identity/MongoDB/MongoIdentityUserDelegationRepository.cs +++ b/modules/identity/src/Volo.Abp.Identity.MongoDB/Volo/Abp/Identity/MongoDB/MongoIdentityUserDelegationRepository.cs @@ -32,19 +32,21 @@ public class MongoIdentityUserDelegationRepository : MongoDbRepository> GetActiveDelegationsAsync(Guid targetUserId, CancellationToken cancellationToken = default) { + var now = Clock.Now; return await (await GetQueryableAsync(cancellationToken)) .Where(x => x.TargetUserId == targetUserId) - .Where(x => x.StartTime <= Clock.Now && x.EndTime >= Clock.Now) + .Where(x => x.StartTime <= now && x.EndTime >= now) .ToListAsync(cancellationToken: cancellationToken); } public virtual async Task FindActiveDelegationByIdAsync(Guid id, CancellationToken cancellationToken = default) { + var now = Clock.Now; return await (await GetQueryableAsync(cancellationToken)) .FirstOrDefaultAsync(x => x.Id == id && - x.StartTime <= Clock.Now && - x.EndTime >= Clock.Now + x.StartTime <= now && + x.EndTime >= now , cancellationToken: GetCancellationToken(cancellationToken)); } } From 6b14fe73f3a9e82f0e13bb800bd174a4cf105ea9 Mon Sep 17 00:00:00 2001 From: maliming Date: Mon, 15 Sep 2025 16:25:13 +0800 Subject: [PATCH 11/11] Optimize event retry time evaluation in inbox classes --- .../DistributedEvents/DbContextEventInbox.cs | 3 ++- .../Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 2e148a97c6..82fd3b5d2e 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -47,11 +47,12 @@ public class DbContextEventInbox : IDbContextEventInbox transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; } + var now = Clock.Now; var outgoingEventRecords = await dbContext .IncomingEvents .AsNoTracking() .Where(x => x.Status == IncomingEventStatus.Pending) - .Where(x => x.NextRetryTime == null || x.NextRetryTime <= Clock.Now) + .Where(x => x.NextRetryTime == null || x.NextRetryTime <= now) .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index 427c794542..8baeb82f3a 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -61,11 +61,12 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; } + var now = Clock.Now; var outgoingEventRecords = await dbContext .IncomingEvents .AsQueryable() .Where(x => x.Status == IncomingEventStatus.Pending) - .Where(x => x.NextRetryTime == null || x.NextRetryTime <= Clock.Now) + .Where(x => x.NextRetryTime == null || x.NextRetryTime <= now) .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount)