Browse Source

Added retry and discard feature.

pull/23563/head
maliming 9 months ago
parent
commit
1292074763
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 24
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs
  2. 2
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs
  3. 25
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs
  4. 4
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs
  5. 9
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs
  6. 10
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs
  7. 63
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs
  8. 21
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs
  9. 8
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs
  10. 28
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs
  11. 44
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs

24
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs

@ -50,7 +50,7 @@ public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
var outgoingEventRecords = await dbContext
.IncomingEvents
.AsNoTracking()
.Where(x => !x.Processed)
.Where(x => x.Status == IncomingEventStatus.Pending)
.WhereIf(transformedFilter != null, transformedFilter!)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
@ -66,7 +66,25 @@ public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
{
var dbContext = await DbContextProvider.GetDbContextAsync();
await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x =>
x.SetProperty(p => p.Processed, _ => true).SetProperty(p => p.ProcessedTime, _ => Clock.Now));
x.SetProperty(p => p.Status, _ => IncomingEventStatus.Processed).SetProperty(p => p.DiscardedOrProcessedTime, _ => Clock.Now));
}
[UnitOfWork]
public virtual async Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x =>
x.SetProperty(p => p.RetryCount, _ => retryCount)
.SetProperty(p => p.NextRetryTime, _ => nextRetryTime)
.SetProperty(p => p.Status, _ => IncomingEventStatus.Pending));
}
[UnitOfWork]
public virtual async Task MarkAsDiscardAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x =>
x.SetProperty(p => p.Status, _ => IncomingEventStatus.Discarded).SetProperty(p => p.DiscardedOrProcessedTime, _ => Clock.Now));
}
[UnitOfWork]
@ -82,7 +100,7 @@ public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
var dbContext = await DbContextProvider.GetDbContextAsync();
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
await dbContext.IncomingEvents
.Where(x => x.Processed && x.CreationTime < timeToKeepEvents)
.Where(x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents)
.ExecuteDeleteAsync();
}
}

2
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs

@ -16,7 +16,7 @@ public static class EventInboxDbContextModelBuilderExtensions
b.Property(x => x.EventName).IsRequired().HasMaxLength(IncomingEventRecord.MaxEventNameLength);
b.Property(x => x.EventData).IsRequired();
b.HasIndex(x => new { x.Processed, x.CreationTime });
b.HasIndex(x => new { x.Status, x.CreationTime });
b.HasIndex(x => x.MessageId);
});
}

25
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs

@ -24,9 +24,13 @@ public class IncomingEventRecord :
public DateTime CreationTime { get; private set; }
public bool Processed { get; set; }
public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending;
public DateTime? ProcessedTime { get; set; }
public DateTime? DiscardedOrProcessedTime { get; set; }
public int RetryCount { get; set; } = 0;
public DateTime? NextRetryTime { get; set; } = null;
protected IncomingEventRecord()
{
@ -71,7 +75,20 @@ public class IncomingEventRecord :
public void MarkAsProcessed(DateTime processedTime)
{
Processed = true;
ProcessedTime = processedTime;
Status = IncomingEventStatus.Processed;
DiscardedOrProcessedTime = processedTime;
}
public void MarkAsDiscarded(DateTime discardedTime)
{
Status = IncomingEventStatus.Discarded;
DiscardedOrProcessedTime = discardedTime;
}
public void RetryLater(int retryCount, DateTime nextRetryTime)
{
Status = IncomingEventStatus.Pending;
NextRetryTime = nextRetryTime;
RetryCount = retryCount;
}
}

4
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs

@ -14,6 +14,10 @@ public interface IEventInbox
Task MarkAsProcessedAsync(Guid id);
Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime);
Task MarkAsDiscardAsync(Guid id);
Task<bool> ExistsByMessageIdAsync(string messageId);
Task DeleteOldEventsAsync();

9
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs

@ -20,6 +20,14 @@ public class IncomingEventInfo : IIncomingEventInfo
public DateTime CreationTime { get; }
public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending;
public DateTime? DiscardedOrProcessedTime { get; set; }
public int RetryCount { get; set; } = 0;
public DateTime? NextRetryTime { get; set; } = null;
protected IncomingEventInfo()
{
ExtraProperties = new ExtraPropertyDictionary();
@ -38,6 +46,7 @@ public class IncomingEventInfo : IIncomingEventInfo
EventName = Check.NotNullOrWhiteSpace(eventName, nameof(eventName), MaxEventNameLength);
EventData = eventData;
CreationTime = creationTime;
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}

10
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs

@ -0,0 +1,10 @@
namespace Volo.Abp.EventBus.Distributed;
public enum IncomingEventStatus
{
Pending = 0,
Discarded = 1,
Processed = 2
}

63
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs

@ -25,7 +25,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
protected IEventInbox Inbox { get; private set; } = default!;
protected InboxConfig InboxConfig { get; private set; } = default!;
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected InboxProcessorOptions InboxProcessorOptions { get; set; }
protected DateTime? LastCleanTime { get; set; }
protected string DistributedLockName { get; set; } = default!;
@ -40,7 +40,8 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
IAbpDistributedLock distributedLock,
IUnitOfWorkManager unitOfWorkManager,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions,
IOptions<InboxProcessorOptions> inboxProcessorOptions)
{
ServiceProvider = serviceProvider;
Timer = timer;
@ -49,6 +50,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
UnitOfWorkManager = unitOfWorkManager;
Clock = clock;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
InboxProcessorOptions = inboxProcessorOptions.Value;
Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.TotalMilliseconds);
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<InboxProcessor>.Instance;
@ -103,6 +105,12 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
foreach (var waitingEvent in waitingEvents)
{
if (waitingEvent.NextRetryTime.HasValue && waitingEvent.NextRetryTime.Value > Clock.Now)
{
Logger.LogInformation($"Event with id = {waitingEvent.Id:N} is not ready to be processed yet. Next retry time: {waitingEvent.NextRetryTime.Value}");
continue;
}
try
{
using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true))
@ -121,6 +129,45 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
catch (Exception e)
{
Logger.LogError(e, $"An error occurred while processing the incoming event with id = {waitingEvent.Id:N}");
if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Retry)
{
throw;
}
if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.RetryLater)
{
using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true))
{
if (waitingEvent.RetryCount >= InboxProcessorOptions.MaxRetryCount)
{
Logger.LogWarning($"Max retry count reached for event with id = {waitingEvent.Id:N}. Discarding the event.");
await Inbox.MarkAsDiscardAsync(waitingEvent.Id);
await uow.CompleteAsync(StoppingToken);
continue;
}
Logger.LogInformation($"Retrying event with id = {waitingEvent.Id:N}. " +
$"Retry count: {waitingEvent.RetryCount}, " +
$"Next retry time: {GetNextRetryTime(waitingEvent.RetryCount, InboxProcessorOptions.MaxRetryCount)}");
await Inbox.RetryLaterAsync(waitingEvent.Id, waitingEvent.RetryCount++, GetNextRetryTime(waitingEvent.RetryCount, InboxProcessorOptions.MaxRetryCount));
await uow.CompleteAsync(StoppingToken);
}
continue;
}
if (InboxProcessorOptions.FailurePolicy == InboxProcessorFailurePolicy.Discard)
{
using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true))
{
Logger.LogInformation($"Discarding event with id = {waitingEvent.Id:N} due to an error.");
await Inbox.MarkAsDiscardAsync(waitingEvent.Id);
await uow.CompleteAsync(StoppingToken);
}
continue;
}
}
}
}
@ -137,6 +184,18 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
}
}
protected virtual DateTime? GetNextRetryTime(int retryCount, int maxRetryCount)
{
if (retryCount > maxRetryCount)
{
return null;
}
var delaySeconds = 1 * (int)Math.Pow(2, retryCount - 1);
return DateTime.Now.AddSeconds(delaySeconds);
}
protected virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync()
{
return await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, EventBusBoxesOptions.InboxProcessorFilter, StoppingToken);

21
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs

@ -0,0 +1,21 @@
namespace Volo.Abp.EventBus.Distributed;
public enum InboxProcessorFailurePolicy
{
/// <summary>
/// Default behavior, retry the following event in next period time.
/// </summary>
Retry,
/// <summary>
/// Skip and retry the event in next period time, but with a delay.
/// The delay increases in every fail, and it is discarded after a specified amount of time
/// (e.g. 1 second, 2 seconds, 4 seconds, 8 seconds, etc.),
/// </summary>
RetryLater,
/// <summary>
/// Skip the event and do not retry it.
/// </summary>
Discard,
}

8
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorOptions.cs

@ -0,0 +1,8 @@
namespace Volo.Abp.EventBus.Distributed;
public class InboxProcessorOptions
{
public InboxProcessorFailurePolicy FailurePolicy { get; set; } = InboxProcessorFailurePolicy.Retry;
public int MaxRetryCount { get; set; } = 10;
}

28
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs

@ -23,12 +23,17 @@ public class IncomingEventRecord :
public DateTime CreationTime { get; private set; }
public bool Processed { get; set; }
public IncomingEventStatus Status { get; set; }
public DateTime? ProcessedTime { get; set; }
public DateTime? DiscardedOrProcessedTime { get; set; }
public int RetryCount { get; set; } = 0;
public DateTime? NextRetryTime { get; set; } = null;
protected IncomingEventRecord()
{
Status = IncomingEventStatus.Pending;
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
@ -41,7 +46,7 @@ public class IncomingEventRecord :
EventName = eventInfo.EventName;
EventData = eventInfo.EventData;
CreationTime = eventInfo.CreationTime;
Status = IncomingEventStatus.Pending;
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
foreach (var property in eventInfo.ExtraProperties)
@ -70,7 +75,20 @@ public class IncomingEventRecord :
public void MarkAsProcessed(DateTime processedTime)
{
Processed = true;
ProcessedTime = processedTime;
Status = IncomingEventStatus.Processed;
DiscardedOrProcessedTime = processedTime;
}
public void MarkAsDiscarded(DateTime discardedTime)
{
Status = IncomingEventStatus.Discarded;
DiscardedOrProcessedTime = discardedTime;
}
public void RetryLater(int retryCount, DateTime nextRetryTime)
{
Status = IncomingEventStatus.Pending;
NextRetryTime = nextRetryTime;
RetryCount = retryCount;
}
}

44
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs

@ -64,7 +64,7 @@ public class MongoDbContextEventInbox<TMongoDbContext> : IMongoDbContextEventInb
var outgoingEventRecords = await dbContext
.IncomingEvents
.AsQueryable()
.Where(x => x.Processed == false)
.Where(x => x.Status == IncomingEventStatus.Pending)
.WhereIf(transformedFilter != null, transformedFilter!)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
@ -81,7 +81,43 @@ public class MongoDbContextEventInbox<TMongoDbContext> : IMongoDbContextEventInb
var dbContext = await DbContextProvider.GetDbContextAsync();
var filter = Builders<IncomingEventRecord>.Filter.Eq(x => x.Id, id);
var update = Builders<IncomingEventRecord>.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now);
var update = Builders<IncomingEventRecord>.Update.Set(x => x.Status == IncomingEventStatus.Processed, true).Set(x => x.DiscardedOrProcessedTime, Clock.Now);
if (dbContext.SessionHandle != null)
{
await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update);
}
else
{
await dbContext.IncomingEvents.UpdateOneAsync(filter, update);
}
}
[UnitOfWork]
public virtual async Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var filter = Builders<IncomingEventRecord>.Filter.Eq(x => x.Id, id);
var update = Builders<IncomingEventRecord>.Update.Set(x => x.RetryCount, retryCount).Set(x => x.NextRetryTime, nextRetryTime);
if (dbContext.SessionHandle != null)
{
await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update);
}
else
{
await dbContext.IncomingEvents.UpdateOneAsync(filter, update);
}
}
[UnitOfWork]
public virtual async Task MarkAsDiscardAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var filter = Builders<IncomingEventRecord>.Filter.Eq(x => x.Id, id);
var update = Builders<IncomingEventRecord>.Update.Set(x => x.Status == IncomingEventStatus.Discarded, true).Set(x => x.DiscardedOrProcessedTime, Clock.Now);
if (dbContext.SessionHandle != null)
{
@ -108,11 +144,11 @@ public class MongoDbContextEventInbox<TMongoDbContext> : IMongoDbContextEventInb
if (dbContext.SessionHandle != null)
{
await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => x.Processed && x.CreationTime < timeToKeepEvents);
await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents);
}
else
{
await dbContext.IncomingEvents.DeleteManyAsync(x => x.Processed && x.CreationTime < timeToKeepEvents);
await dbContext.IncomingEvents.DeleteManyAsync(x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents);
}
}
}

Loading…
Cancel
Save