diff --git a/docs/en/framework/infrastructure/event-bus/distributed/index.md b/docs/en/framework/infrastructure/event-bus/distributed/index.md index d56215fa12..f379a44f40 100644 --- a/docs/en/framework/infrastructure/event-bus/distributed/index.md +++ b/docs/en/framework/infrastructure/event-bus/distributed/index.md @@ -643,6 +643,11 @@ Configure(options => * `InboxWaitingEventMaxCount`: The maximum number of events to query at once from the inbox in the database. Default value is 1000. * `OutboxWaitingEventMaxCount`: The maximum number of events to query at once from the outbox in the database. Default value is 1000. * `DistributedLockWaitDuration`: ABP uses [distributed locking](../../distributed-locking.md) to prevent concurrent access to the inbox and outbox messages in the database, when running multiple instance of the same application. If an instance of the application can not obtain the lock, it tries after a duration. This is the configuration of that duration. Default value is 15 seconds (`TimeSpan.FromSeconds(15)`). +* `InboxProcessorFailurePolicy`: The policy to handle the failure of the inbox processor. Default value is `Retry`. Possible values are: + * `Retry`: The current exception and subsequent events will continue to be processed in order in the next cycle. + * `RetryLater`: Skip the event that caused the exception and continue with the following events. The failed event will be retried after a delay that doubles with each retry, starting from the configured `InboxProcessorRetryBackoffFactor` (e.g., 10, 20, 40, 80 seconds). The default maximum retry count is 10 (configurable). Discard the event if it still fails after reaching the maximum retry count. + * `Discard`: The event that caused the exception will be discarded and will not be retried. +* `InboxProcessorRetryBackoffFactor`: The initial retry delay factor (double) used when `InboxProcessorFailurePolicy` is `RetryLater`. The retry delay is calculated as: `delay = InboxProcessorRetryBackoffFactor × 2^retryCount`. Default value is `10`. ### Skipping Outbox diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 54ddfd0e5e..82fd3b5d2e 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -47,10 +47,12 @@ public class DbContextEventInbox : IDbContextEventInbox transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; } + var now = Clock.Now; var outgoingEventRecords = await dbContext .IncomingEvents .AsNoTracking() - .Where(x => !x.Processed) + .Where(x => x.Status == IncomingEventStatus.Pending) + .Where(x => x.NextRetryTime == null || x.NextRetryTime <= now) .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) @@ -66,7 +68,25 @@ public class DbContextEventInbox : IDbContextEventInbox { var dbContext = await DbContextProvider.GetDbContextAsync(); await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => - x.SetProperty(p => p.Processed, _ => true).SetProperty(p => p.ProcessedTime, _ => Clock.Now)); + x.SetProperty(p => p.Status, _ => IncomingEventStatus.Processed).SetProperty(p => p.HandledTime, _ => Clock.Now)); + } + + [UnitOfWork] + public virtual async Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => + x.SetProperty(p => p.RetryCount, _ => retryCount) + .SetProperty(p => p.NextRetryTime, _ => nextRetryTime) + .SetProperty(p => p.Status, _ => IncomingEventStatus.Pending)); + } + + [UnitOfWork] + public virtual async Task MarkAsDiscardAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x => + x.SetProperty(p => p.Status, _ => IncomingEventStatus.Discarded).SetProperty(p => p.HandledTime, _ => Clock.Now)); } [UnitOfWork] @@ -82,7 +102,7 @@ public class DbContextEventInbox : IDbContextEventInbox var dbContext = await DbContextProvider.GetDbContextAsync(); var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; await dbContext.IncomingEvents - .Where(x => x.Processed && x.CreationTime < timeToKeepEvents) + .Where(x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents) .ExecuteDeleteAsync(); } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs index fbe91c2def..b7474c3b11 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EventInboxDbContextModelBuilderExtensions.cs @@ -16,7 +16,7 @@ public static class EventInboxDbContextModelBuilderExtensions b.Property(x => x.EventName).IsRequired().HasMaxLength(IncomingEventRecord.MaxEventNameLength); b.Property(x => x.EventData).IsRequired(); - b.HasIndex(x => new { x.Processed, x.CreationTime }); + b.HasIndex(x => new { x.Status, x.CreationTime }); b.HasIndex(x => x.MessageId); }); } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs index be7da15890..3f06bf4ef9 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs @@ -24,9 +24,13 @@ public class IncomingEventRecord : public DateTime CreationTime { get; private set; } - public bool Processed { get; set; } + public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending; - public DateTime? ProcessedTime { get; set; } + public DateTime? HandledTime { get; set; } + + public int RetryCount { get; set; } = 0; + + public DateTime? NextRetryTime { get; set; } = null; protected IncomingEventRecord() { @@ -58,7 +62,11 @@ public class IncomingEventRecord : MessageId, EventName, EventData, - CreationTime + CreationTime, + Status, + HandledTime, + RetryCount, + NextRetryTime ); foreach (var property in ExtraProperties) @@ -71,7 +79,20 @@ public class IncomingEventRecord : public void MarkAsProcessed(DateTime processedTime) { - Processed = true; - ProcessedTime = processedTime; + Status = IncomingEventStatus.Processed; + HandledTime = processedTime; + } + + public void MarkAsDiscarded(DateTime discardedTime) + { + Status = IncomingEventStatus.Discarded; + HandledTime = discardedTime; + } + + public void RetryLater(int retryCount, DateTime nextRetryTime) + { + Status = IncomingEventStatus.Pending; + NextRetryTime = nextRetryTime; + RetryCount = retryCount; } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs index c154330495..66b605d576 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs @@ -14,6 +14,10 @@ public interface IEventInbox Task MarkAsProcessedAsync(Guid id); + Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime); + + Task MarkAsDiscardAsync(Guid id); + Task ExistsByMessageIdAsync(string messageId); Task DeleteOldEventsAsync(); diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs index 1be24a3d02..23e59ad61a 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs @@ -20,6 +20,14 @@ public class IncomingEventInfo : IIncomingEventInfo public DateTime CreationTime { get; } + public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending; + + public DateTime? HandledTime { get; set; } + + public int RetryCount { get; set; } = 0; + + public DateTime? NextRetryTime { get; set; } = null; + protected IncomingEventInfo() { ExtraProperties = new ExtraPropertyDictionary(); @@ -31,13 +39,21 @@ public class IncomingEventInfo : IIncomingEventInfo string messageId, string eventName, byte[] eventData, - DateTime creationTime) + DateTime creationTime, + IncomingEventStatus status = IncomingEventStatus.Pending, + DateTime? handledTime = null, + int retryCount = 0, + DateTime? nextRetryTime = null) { Id = id; MessageId = messageId; EventName = Check.NotNullOrWhiteSpace(eventName, nameof(eventName), MaxEventNameLength); EventData = eventData; CreationTime = creationTime; + Status = status; + HandledTime = handledTime; + RetryCount = retryCount; + NextRetryTime = nextRetryTime; ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs new file mode 100644 index 0000000000..27a210770c --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventStatus.cs @@ -0,0 +1,10 @@ +namespace Volo.Abp.EventBus.Distributed; + +public enum IncomingEventStatus +{ + Pending = 0, + + Discarded = 1, + + Processed = 2 +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs index cc91cad5df..6febf2f065 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs @@ -36,6 +36,23 @@ public class AbpEventBusBoxesOptions /// public TimeSpan PeriodTimeSpan { get; set; } + /// + /// Default: + /// + public InboxProcessorFailurePolicy InboxProcessorFailurePolicy { get; set; } = InboxProcessorFailurePolicy.Retry; + + /// + /// Default: 10 + /// + public int InboxProcessorMaxRetryCount { get; set; } = 10; + + /// + /// Default value is 10 + /// The initial retry delay factor (double) when `InboxProcessorFailurePolicy` is `RetryLater`. + /// The delay is calculated as: `delay = InboxProcessorRetryBackoffFactor × 2^retryCount` + /// + public double InboxProcessorRetryBackoffFactor { get; set; } = 10; + /// /// Default: 15 seconds /// diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index 06014701a2..b00ba81242 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; @@ -25,7 +26,6 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency protected IEventInbox Inbox { get; private set; } = default!; protected InboxConfig InboxConfig { get; private set; } = default!; protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } - protected DateTime? LastCleanTime { get; set; } protected string DistributedLockName { get; set; } = default!; @@ -103,18 +103,76 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency foreach (var waitingEvent in waitingEvents) { - using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) + Logger.LogInformation($"Start processing the incoming event with id = {waitingEvent.Id:N}"); + + try { - await DistributedEventBus - .AsSupportsEventBoxes() - .ProcessFromInboxAsync(waitingEvent, InboxConfig); + using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) + { + await DistributedEventBus + .AsSupportsEventBoxes() + .ProcessFromInboxAsync(waitingEvent, InboxConfig); - await Inbox.MarkAsProcessedAsync(waitingEvent.Id); + await Inbox.MarkAsProcessedAsync(waitingEvent.Id); - await uow.CompleteAsync(StoppingToken); - } + await uow.CompleteAsync(StoppingToken); + } - Logger.LogInformation($"Processed the incoming event with id = {waitingEvent.Id:N}"); + Logger.LogInformation($"Processed the incoming event with id = {waitingEvent.Id:N}"); + } + catch (Exception e) + { + Logger.LogError(e, $"Event with id = {waitingEvent.Id:N} processing failed."); + + if (EventBusBoxesOptions.InboxProcessorFailurePolicy == InboxProcessorFailurePolicy.Retry) + { + throw; + } + + if (EventBusBoxesOptions.InboxProcessorFailurePolicy == InboxProcessorFailurePolicy.RetryLater) + { + using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) + { + if (waitingEvent.NextRetryTime != null) + { + waitingEvent.RetryCount++; + } + + if (waitingEvent.RetryCount >= EventBusBoxesOptions.InboxProcessorMaxRetryCount) + { + Logger.LogWarning($"Event with id = {waitingEvent.Id:N} has exceeded the maximum retry count. Marking it as discarded."); + + await Inbox.RetryLaterAsync(waitingEvent.Id, waitingEvent.RetryCount, null); + await Inbox.MarkAsDiscardAsync(waitingEvent.Id); + await uow.CompleteAsync(StoppingToken); + continue; + } + + waitingEvent.NextRetryTime = GetNextRetryTime(waitingEvent.RetryCount, EventBusBoxesOptions.InboxProcessorRetryBackoffFactor); + + Logger.LogInformation($"Event with id = {waitingEvent.Id:N} will retry later. " + + $"Current retry count: {waitingEvent.RetryCount}, " + + $"Next retry time: {waitingEvent.NextRetryTime}, " + + $"Max retry count: {EventBusBoxesOptions.InboxProcessorMaxRetryCount}."); + + await Inbox.RetryLaterAsync(waitingEvent.Id, waitingEvent.RetryCount, GetNextRetryTime(waitingEvent.RetryCount, EventBusBoxesOptions.InboxProcessorRetryBackoffFactor)); + await uow.CompleteAsync(StoppingToken); + } + continue; + } + + if (EventBusBoxesOptions.InboxProcessorFailurePolicy == InboxProcessorFailurePolicy.Discard) + { + using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) + { + Logger.LogInformation($"Event with id = {waitingEvent.Id:N} will be discarded."); + + await Inbox.MarkAsDiscardAsync(waitingEvent.Id); + await uow.CompleteAsync(StoppingToken); + } + continue; + } + } } } } @@ -130,6 +188,12 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency } } + protected virtual DateTime? GetNextRetryTime(int retryCount, double factor) + { + var delaySeconds = factor * Math.Pow(2, retryCount); + return DateTime.Now.AddSeconds(delaySeconds); + } + protected virtual async Task> GetWaitingEventsAsync() { return await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, EventBusBoxesOptions.InboxProcessorFilter, StoppingToken); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs new file mode 100644 index 0000000000..31095b62a3 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessorFailurePolicy.cs @@ -0,0 +1,22 @@ +namespace Volo.Abp.EventBus.Distributed; + +public enum InboxProcessorFailurePolicy +{ + /// + /// Default behavior, retry the following event in next period time. + /// + Retry, + + /// + /// Skip the failed event and retry it after a delay. + /// The delay doubles with each retry, starting from the configured InboxProcessorRetryBackoffFactor + /// (e.g., 10, 20, 40, 80 seconds, etc.). + /// The event is discarded if it still fails after reaching the maximum retry count. + /// + RetryLater, + + /// + /// Skip the event and do not retry it. + /// + Discard, +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs index a48492d1da..0a3a1d4dd2 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs @@ -23,9 +23,13 @@ public class IncomingEventRecord : public DateTime CreationTime { get; private set; } - public bool Processed { get; set; } + public IncomingEventStatus Status { get; set; } = IncomingEventStatus.Pending; - public DateTime? ProcessedTime { get; set; } + public DateTime? HandledTime { get; set; } + + public int RetryCount { get; set; } = 0; + + public DateTime? NextRetryTime { get; set; } = null; protected IncomingEventRecord() { @@ -41,7 +45,10 @@ public class IncomingEventRecord : EventName = eventInfo.EventName; EventData = eventInfo.EventData; CreationTime = eventInfo.CreationTime; - + Status = eventInfo.Status; + HandledTime = eventInfo.HandledTime; + RetryCount = eventInfo.RetryCount; + NextRetryTime = eventInfo.NextRetryTime; ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); foreach (var property in eventInfo.ExtraProperties) @@ -57,7 +64,11 @@ public class IncomingEventRecord : MessageId, EventName, EventData, - CreationTime + CreationTime, + Status, + HandledTime, + RetryCount, + NextRetryTime ); foreach (var property in ExtraProperties) @@ -70,7 +81,20 @@ public class IncomingEventRecord : public void MarkAsProcessed(DateTime processedTime) { - Processed = true; - ProcessedTime = processedTime; + Status = IncomingEventStatus.Processed; + HandledTime = processedTime; + } + + public void MarkAsDiscarded(DateTime discardedTime) + { + Status = IncomingEventStatus.Discarded; + HandledTime = discardedTime; + } + + public void RetryLater(int retryCount, DateTime nextRetryTime) + { + Status = IncomingEventStatus.Pending; + NextRetryTime = nextRetryTime; + RetryCount = retryCount; } } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index e5d726526b..8baeb82f3a 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -61,10 +61,12 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; } + var now = Clock.Now; var outgoingEventRecords = await dbContext .IncomingEvents .AsQueryable() - .Where(x => x.Processed == false) + .Where(x => x.Status == IncomingEventStatus.Pending) + .Where(x => x.NextRetryTime == null || x.NextRetryTime <= now) .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) @@ -81,7 +83,43 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb var dbContext = await DbContextProvider.GetDbContextAsync(); var filter = Builders.Filter.Eq(x => x.Id, id); - var update = Builders.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now); + var update = Builders.Update.Set(x => x.Status, IncomingEventStatus.Processed).Set(x => x.HandledTime, Clock.Now); + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update); + } + else + { + await dbContext.IncomingEvents.UpdateOneAsync(filter, update); + } + } + + [UnitOfWork] + public virtual async Task RetryLaterAsync(Guid id, int retryCount, DateTime? nextRetryTime) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + + var filter = Builders.Filter.Eq(x => x.Id, id); + var update = Builders.Update.Set(x => x.RetryCount, retryCount).Set(x => x.NextRetryTime, nextRetryTime); + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update); + } + else + { + await dbContext.IncomingEvents.UpdateOneAsync(filter, update); + } + } + + [UnitOfWork] + public virtual async Task MarkAsDiscardAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + + var filter = Builders.Filter.Eq(x => x.Id, id); + var update = Builders.Update.Set(x => x.Status, IncomingEventStatus.Discarded).Set(x => x.HandledTime, Clock.Now); if (dbContext.SessionHandle != null) { @@ -108,11 +146,11 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb if (dbContext.SessionHandle != null) { - await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => x.Processed && x.CreationTime < timeToKeepEvents); + await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents); } else { - await dbContext.IncomingEvents.DeleteManyAsync(x => x.Processed && x.CreationTime < timeToKeepEvents); + await dbContext.IncomingEvents.DeleteManyAsync(x => (x.Status == IncomingEventStatus.Processed || x.Status == IncomingEventStatus.Discarded) && x.CreationTime < timeToKeepEvents); } } } diff --git a/modules/identity/src/Volo.Abp.Identity.EntityFrameworkCore/Volo/Abp/Identity/EntityFrameworkCore/EfCoreIdentityUserDelegationRepository.cs b/modules/identity/src/Volo.Abp.Identity.EntityFrameworkCore/Volo/Abp/Identity/EntityFrameworkCore/EfCoreIdentityUserDelegationRepository.cs index a21e0f8ea0..81953c54f4 100644 --- a/modules/identity/src/Volo.Abp.Identity.EntityFrameworkCore/Volo/Abp/Identity/EntityFrameworkCore/EfCoreIdentityUserDelegationRepository.cs +++ b/modules/identity/src/Volo.Abp.Identity.EntityFrameworkCore/Volo/Abp/Identity/EntityFrameworkCore/EfCoreIdentityUserDelegationRepository.cs @@ -31,22 +31,24 @@ public class EfCoreIdentityUserDelegationRepository : EfCoreRepository> GetActiveDelegationsAsync(Guid targetUserId, CancellationToken cancellationToken = default) { + var now = Clock.Now; return await (await GetDbSetAsync()) .AsNoTracking() .Where(x => x.TargetUserId == targetUserId && - x.StartTime <= Clock.Now && - x.EndTime >= Clock.Now) + x.StartTime <= now && + x.EndTime >= now) .ToListAsync(cancellationToken: cancellationToken); } public virtual async Task FindActiveDelegationByIdAsync(Guid id, CancellationToken cancellationToken = default) { + var now = Clock.Now; return await (await GetDbSetAsync()) .AsNoTracking() .FirstOrDefaultAsync(x => x.Id == id && - x.StartTime <= Clock.Now && - x.EndTime >= Clock.Now + x.StartTime <= now && + x.EndTime >= now , cancellationToken: GetCancellationToken(cancellationToken)); } } diff --git a/modules/identity/src/Volo.Abp.Identity.MongoDB/Volo/Abp/Identity/MongoDB/MongoIdentityUserDelegationRepository.cs b/modules/identity/src/Volo.Abp.Identity.MongoDB/Volo/Abp/Identity/MongoDB/MongoIdentityUserDelegationRepository.cs index 1855069b4a..bd96ed4164 100644 --- a/modules/identity/src/Volo.Abp.Identity.MongoDB/Volo/Abp/Identity/MongoDB/MongoIdentityUserDelegationRepository.cs +++ b/modules/identity/src/Volo.Abp.Identity.MongoDB/Volo/Abp/Identity/MongoDB/MongoIdentityUserDelegationRepository.cs @@ -32,19 +32,21 @@ public class MongoIdentityUserDelegationRepository : MongoDbRepository> GetActiveDelegationsAsync(Guid targetUserId, CancellationToken cancellationToken = default) { + var now = Clock.Now; return await (await GetQueryableAsync(cancellationToken)) .Where(x => x.TargetUserId == targetUserId) - .Where(x => x.StartTime <= Clock.Now && x.EndTime >= Clock.Now) + .Where(x => x.StartTime <= now && x.EndTime >= now) .ToListAsync(cancellationToken: cancellationToken); } public virtual async Task FindActiveDelegationByIdAsync(Guid id, CancellationToken cancellationToken = default) { + var now = Clock.Now; return await (await GetQueryableAsync(cancellationToken)) .FirstOrDefaultAsync(x => x.Id == id && - x.StartTime <= Clock.Now && - x.EndTime >= Clock.Now + x.StartTime <= now && + x.EndTime >= now , cancellationToken: GetCancellationToken(cancellationToken)); } }