diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 54ddfd0e5e..9a5ac3f154 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -50,7 +50,7 @@ public class DbContextEventInbox : IDbContextEventInbox var outgoingEventRecords = await dbContext .IncomingEvents .AsNoTracking() - .Where(x => !x.Processed) + .Where(x => x.Status == IncomingEventStatus.Pending) .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) @@ -66,7 +66,25 @@ public class DbContextEventInbox : IDbContextEventInbox { var dbContext = await DbContextProvider.GetDbContextAsync(); await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => - x.SetProperty(p => p.Processed, _ => true).SetProperty(p => p.ProcessedTime, _ => Clock.Now)); + x.SetProperty(p => p.Status, _ => IncomingEventStatus.Processed).SetProperty(p => p.DiscardedOrProcessedTime, _ => Clock.Now)); + } + + [UnitOfWork] + public virtual async Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => + x.SetProperty(p => p.RetryCount, _ => retryCount) + .SetProperty(p => p.NextRetryTime, _ => nextRetryTime) + .SetProperty(p => p.Status, _ => IncomingEventStatus.Pending)); + } + + [UnitOfWork] + public virtual async Task MarkAsDiscardAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => + x.SetProperty(p => p.Status, _ => IncomingEventStatus.Discarded).SetProperty(p => p.DiscardedOrProcessedTime, _ => Clock.Now)); } [UnitOfWork] @@ -82,7 +100,7 @@ public class DbContextEventInbox : IDbContextEventInbox var dbContext = await DbContextProvider.GetDbContextAsync(); var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; await dbContext.IncomingEvents - .Where(x => x.Processed && x.CreationTime < timeToKeepEvents) + .Where(x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents) .ExecuteDeleteAsync(); } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs index fbe91c2def..b7474c3b11 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs @@ -16,7 +16,7 @@ public static class EventInboxDbContextModelBuilderExtensions b.Property(x => x.EventName).IsRequired().HasMaxLength(IncomingEventRecord.MaxEventNameLength); b.Property(x => x.EventData).IsRequired(); - b.HasIndex(x => new { x.Processed, x.CreationTime }); + b.HasIndex(x => new { x.Status, x.CreationTime }); b.HasIndex(x => x.MessageId); }); } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs index be7da15890..4f01dfbe37 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs @@ -24,9 +24,13 @@ public class IncomingEventRecord : public DateTime CreationTime { get; private set; } - public bool Processed { get; set; } + public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending; - public DateTime? ProcessedTime { get; set; } + public DateTime? DiscardedOrProcessedTime { get; set; } + + public int RetryCount { get; set; } = 0; + + public DateTime? NextRetryTime { get; set; } = null; protected IncomingEventRecord() { @@ -71,7 +75,20 @@ public class IncomingEventRecord : public void MarkAsProcessed(DateTime processedTime) { - Processed = true; - ProcessedTime = processedTime; + Status = IncomingEventStatus.Processed; + DiscardedOrProcessedTime = processedTime; + } + + public void MarkAsDiscarded(DateTime discardedTime) + { + Status = IncomingEventStatus.Discarded; + DiscardedOrProcessedTime = discardedTime; + } + + public void RetryLater(int retryCount, DateTime nextRetryTime) + { + Status = IncomingEventStatus.Pending; + NextRetryTime = nextRetryTime; + RetryCount = retryCount; } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs index c154330495..66b605d576 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs @@ -14,6 +14,10 @@ public interface IEventInbox Task MarkAsProcessedAsync(Guid id); + Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime); + + Task MarkAsDiscardAsync(Guid id); + Task ExistsByMessageIdAsync(string messageId); Task DeleteOldEventsAsync(); diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs index 1be24a3d02..855d4a7302 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs @@ -20,6 +20,14 @@ public class IncomingEventInfo : IIncomingEventInfo public DateTime CreationTime { get; } + public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending; + + public DateTime? DiscardedOrProcessedTime { get; set; } + + public int RetryCount { get; set; } = 0; + + public DateTime? NextRetryTime { get; set; } = null; + protected IncomingEventInfo() { ExtraProperties = new ExtraPropertyDictionary(); @@ -38,6 +46,7 @@ public class IncomingEventInfo : IIncomingEventInfo EventName = Check.NotNullOrWhiteSpace(eventName, nameof(eventName), MaxEventNameLength); EventData = eventData; CreationTime = creationTime; + ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs new file mode 100644 index 0000000000..27a210770c --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs @@ -0,0 +1,10 @@ +namespace Volo.Abp.EventBus.Distributed; + +public enum IncomingEventStatus +{ + Pending = 0, + + Discarded = 1, + + Processed = 2 +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index f0d86627ae..bbd203011a 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -25,7 +25,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency protected IEventInbox Inbox { get; private set; } = default!; protected InboxConfig InboxConfig { get; private set; } = default!; protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } - + protected InboxProcessorOptions InboxProcessorOptions { get; set; } protected DateTime? LastCleanTime { get; set; } protected string DistributedLockName { get; set; } = default!; @@ -40,7 +40,8 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency IAbpDistributedLock distributedLock, IUnitOfWorkManager unitOfWorkManager, IClock clock, - IOptions eventBusBoxesOptions) + IOptions eventBusBoxesOptions, + IOptions inboxProcessorOptions) { ServiceProvider = serviceProvider; Timer = timer; @@ -49,6 +50,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency UnitOfWorkManager = unitOfWorkManager; Clock = clock; EventBusBoxesOptions = eventBusBoxesOptions.Value; + InboxProcessorOptions = inboxProcessorOptions.Value; Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.TotalMilliseconds); Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; @@ -103,6 +105,12 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency foreach (var waitingEvent in waitingEvents) { + if (waitingEvent.NextRetryTime.HasValue && waitingEvent.NextRetryTime.Value > Clock.Now) + { + Logger.LogInformation($"Event with id = {waitingEvent.Id:N} is not ready to be processed yet. Next retry time: {waitingEvent.NextRetryTime.Value}"); + continue; + } + try { using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) @@ -121,6 +129,45 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency catch (Exception e) { Logger.LogError(e, $"An error occurred while processing the incoming event with id = {waitingEvent.Id:N}"); + + if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Retry) + { + throw; + } + + if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.RetryLater) + { + using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) + { + if (waitingEvent.RetryCount >= InboxProcessorOptions.MaxRetryCount) + { + Logger.LogWarning($"Max retry count reached for event with id = {waitingEvent.Id:N}. Discarding the event."); + + await Inbox.MarkAsDiscardAsync(waitingEvent.Id); + await uow.CompleteAsync(StoppingToken); + continue; + } + + Logger.LogInformation($"Retrying event with id = {waitingEvent.Id:N}. " + + $"Retry count: {waitingEvent.RetryCount}, " + + $"Next retry time: {GetNextRetryTime(waitingEvent.RetryCount, InboxProcessorOptions.MaxRetryCount)}"); + + await Inbox.RetryLaterAsync(waitingEvent.Id, waitingEvent.RetryCount++, GetNextRetryTime(waitingEvent.RetryCount, InboxProcessorOptions.MaxRetryCount)); + await uow.CompleteAsync(StoppingToken); + } + continue; + } + + if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Discard) + { + using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) + { + Logger.LogInformation($"Discarding event with id = {waitingEvent.Id:N} due to an error."); + await Inbox.MarkAsDiscardAsync(waitingEvent.Id); + await uow.CompleteAsync(StoppingToken); + } + continue; + } } } } @@ -137,6 +184,18 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency } } + protected virtual DateTime? GetNextRetryTime(int retryCount, int maxRetryCount) + { + if (retryCount > maxRetryCount) + { + return null; + } + + var delaySeconds = 1 * (int)Math.Pow(2, retryCount - 1); + + return DateTime.Now.AddSeconds(delaySeconds); + } + protected virtual async Task> GetWaitingEventsAsync() { return await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, EventBusBoxesOptions.InboxProcessorFilter, StoppingToken); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs new file mode 100644 index 0000000000..371fb00548 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs @@ -0,0 +1,21 @@ +namespace Volo.Abp.EventBus.Distributed; + +public enum InboxProcessorFailurePolicy +{ + /// + /// Default behavior, retry the following event in next period time. + /// + Retry, + + /// + /// Skip and retry the event in next period time, but with a delay. + /// The delay increases in every fail, and it is discarded after a specified amount of time + /// (e.g. 1 second, 2 seconds, 4 seconds, 8 seconds, etc.), + /// + RetryLater, + + /// + /// Skip the event and do not retry it. + /// + Discard, +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs new file mode 100644 index 0000000000..f3972fd1b3 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EventBus.Distributed; + +public class InboxProcessorOptions +{ + public InboxProcessorFailurePolicy FailurePolicy { get; set; } = InboxProcessorFailurePolicy.Retry; + + public int MaxRetryCount { get; set; } = 10; +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs index a48492d1da..9ef7c87167 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs @@ -23,12 +23,17 @@ public class IncomingEventRecord : public DateTime CreationTime { get; private set; } - public bool Processed { get; set; } + public IncomingEventStatus Status { get; set; } - public DateTime? ProcessedTime { get; set; } + public DateTime? DiscardedOrProcessedTime { get; set; } + + public int RetryCount { get; set; } = 0; + + public DateTime? NextRetryTime { get; set; } = null; protected IncomingEventRecord() { + Status = IncomingEventStatus.Pending; ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } @@ -41,7 +46,7 @@ public class IncomingEventRecord : EventName = eventInfo.EventName; EventData = eventInfo.EventData; CreationTime = eventInfo.CreationTime; - + Status = IncomingEventStatus.Pending; ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); foreach (var property in eventInfo.ExtraProperties) @@ -70,7 +75,20 @@ public class IncomingEventRecord : public void MarkAsProcessed(DateTime processedTime) { - Processed = true; - ProcessedTime = processedTime; + Status = IncomingEventStatus.Processed; + DiscardedOrProcessedTime = processedTime; + } + + public void MarkAsDiscarded(DateTime discardedTime) + { + Status = IncomingEventStatus.Discarded; + DiscardedOrProcessedTime = discardedTime; + } + + public void RetryLater(int retryCount, DateTime nextRetryTime) + { + Status = IncomingEventStatus.Pending; + NextRetryTime = nextRetryTime; + RetryCount = retryCount; } } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index e5d726526b..1925f8280a 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -64,7 +64,7 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb var outgoingEventRecords = await dbContext .IncomingEvents .AsQueryable() - .Where(x => x.Processed == false) + .Where(x => x.Status == IncomingEventStatus.Pending) .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) @@ -81,7 +81,43 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb var dbContext = await DbContextProvider.GetDbContextAsync(); var filter = Builders.Filter.Eq(x => x.Id, id); - var update = Builders.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now); + var update = Builders.Update.Set(x => x.Status == IncomingEventStatus.Processed, true).Set(x => x.DiscardedOrProcessedTime, Clock.Now); + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update); + } + else + { + await dbContext.IncomingEvents.UpdateOneAsync(filter, update); + } + } + + [UnitOfWork] + public virtual async Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + + var filter = Builders.Filter.Eq(x => x.Id, id); + var update = Builders.Update.Set(x => x.RetryCount, retryCount).Set(x => x.NextRetryTime, nextRetryTime); + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update); + } + else + { + await dbContext.IncomingEvents.UpdateOneAsync(filter, update); + } + } + + [UnitOfWork] + public virtual async Task MarkAsDiscardAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + + var filter = Builders.Filter.Eq(x => x.Id, id); + var update = Builders.Update.Set(x => x.Status == IncomingEventStatus.Discarded, true).Set(x => x.DiscardedOrProcessedTime, Clock.Now); if (dbContext.SessionHandle != null) { @@ -108,11 +144,11 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb if (dbContext.SessionHandle != null) { - await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => x.Processed && x.CreationTime < timeToKeepEvents); + await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents); } else { - await dbContext.IncomingEvents.DeleteManyAsync(x => x.Processed && x.CreationTime < timeToKeepEvents); + await dbContext.IncomingEvents.DeleteManyAsync(x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents); } } }