From 3ee9f57c9dd50a6ecb77b121cb74808aaae17e47 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Thu, 30 Sep 2021 15:17:19 +0800 Subject: [PATCH] Improve --- .../Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj | 2 +- .../Volo/Abp/Domain/AbpDddDomainModule.cs | 3 ++- .../DistributedEvents/OracleDbContextEventInbox.cs | 6 ++++-- .../DistributedEvents/OracleDbContextEventInbox.cs | 7 ++++--- .../PostgreSqlDbContextEventInbox.cs | 7 ++++--- .../AbpEntityFrameworkCoreModule.cs | 2 -- .../DistributedEvents/DbContextEventInbox.cs | 9 +++++---- .../DistributedEvents/SqlRawDbContextEventInbox.cs | 8 ++++---- .../Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs | 11 ++++++++--- .../Volo/Abp/EventBus/Boxes/InboxProcessor.cs | 8 ++++---- .../Volo/Abp/EventBus/Boxes/OutboxSender.cs | 4 ++-- .../Distributed/AbpDistributedEventBusOptions.cs | 8 -------- .../Abp/MongoDB/DistributedEvents/IHasEventInbox.cs | 4 ++-- .../MongoDB/DistributedEvents/IHasEventOutbox.cs | 4 ++-- .../DistributedEvents/MongoDbContextEventInbox.cs | 9 +++++---- .../DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs | 13 +++---------- 16 files changed, 50 insertions(+), 55 deletions(-) diff --git a/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj b/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj index 87879334af..68c90489ff 100644 --- a/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj +++ b/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj @@ -17,7 +17,7 @@ - + diff --git a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs index 523a6ee4f2..38e2d9fc84 100644 --- a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs +++ b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs @@ -3,6 +3,7 @@ using Volo.Abp.Auditing; using Volo.Abp.Data; using Volo.Abp.Domain.Repositories; using Volo.Abp.EventBus; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.ExceptionHandling; using Volo.Abp.Guids; using Volo.Abp.Modularity; @@ -18,7 +19,7 @@ namespace Volo.Abp.Domain [DependsOn( typeof(AbpAuditingModule), typeof(AbpDataModule), - typeof(AbpEventBusModule), + typeof(AbpEventBusBoxesModule), typeof(AbpGuidsModule), typeof(AbpMultiTenancyModule), typeof(AbpThreadingModule), diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs index d5f1b7afcc..ee73291e58 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -14,7 +15,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public OracleDbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions) + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) { } @@ -33,7 +35,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now.Add(- EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents); var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; await dbContext.Database.ExecuteSqlRawAsync(sql); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs index d5f1b7afcc..6d6d04d267 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; -using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -14,7 +14,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public OracleDbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions) + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) { } @@ -33,7 +34,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; await dbContext.Database.ExecuteSqlRawAsync(sql); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs index 4560bdb2c4..fedff90dec 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -14,8 +15,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public PostgreSqlDbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) - : base(dbContextProvider, clock, distributedEventsOptions) + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) { } @@ -34,7 +35,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < '{timeToKeepEvents}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs index f7834c1835..96c245be2e 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs @@ -2,10 +2,8 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Volo.Abp.Domain; -using Volo.Abp.EntityFrameworkCore.DependencyInjection; using Volo.Abp.EntityFrameworkCore.DistributedEvents; using Volo.Abp.Modularity; -using Volo.Abp.Uow; using Volo.Abp.Uow.EntityFrameworkCore; namespace Volo.Abp.EntityFrameworkCore diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 982d335d17..242b601177 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -15,17 +16,17 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents where TDbContext : IHasEventInbox { protected IDbContextProvider DbContextProvider { get; } - protected AbpDistributedEventBusOptions DistributedEventsOptions { get; } + protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } protected IClock Clock { get; } public DbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) + IOptions eventBusBoxesOptions) { DbContextProvider = dbContextProvider; Clock = clock; - DistributedEventsOptions = distributedEventsOptions.Value; + EventBusBoxesOptions = eventBusBoxesOptions.Value; } [UnitOfWork] @@ -78,7 +79,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public virtual async Task DeleteOldEventsAsync() { var dbContext = await DbContextProvider.GetDbContextAsync(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; var oldEvents = await dbContext.IncomingEvents .Where(x => x.Processed && x.CreationTime < timeToKeepEvents) .ToListAsync(); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs index ce7302755a..cb764ede17 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; -using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -14,8 +14,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public SqlRawDbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) - : base(dbContextProvider, clock, distributedEventsOptions) + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) { } @@ -34,7 +34,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; var sql = $"DELETE FROM {tableName} WHERE Processed = '1' AND CreationTime < '{timeToKeepEvents}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs index da91d672d3..1d72666c42 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs @@ -26,10 +26,14 @@ namespace Volo.Abp.EventBus.Boxes public TimeSpan PeriodTimeSpan { get; set; } /// - /// Delay time of and /// Default: 15 seconds /// - public TimeSpan DelayTimeSpan { get; set; } + public TimeSpan DistributedLockWaitDuration { get; set; } + + /// + /// Default: 2 hours + /// + public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; } public AbpEventBusBoxesOptions() { @@ -37,7 +41,8 @@ namespace Volo.Abp.EventBus.Boxes InboxWaitingEventMaxCount = 1000; OutboxWaitingEventMaxCount = 1000; PeriodTimeSpan = TimeSpan.FromSeconds(2); - DelayTimeSpan = TimeSpan.FromSeconds(15); + DistributedLockWaitDuration = TimeSpan.FromSeconds(15); + WaitTimeToDeleteProcessedInboxEvents = TimeSpan.FromHours(2); } } } diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs index 29cca3d624..e3c71771ad 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs @@ -49,7 +49,7 @@ namespace Volo.Abp.EventBus.Boxes UnitOfWorkManager = unitOfWorkManager; Clock = clock; EventBusBoxesOptions = eventBusBoxesOptions.Value; - Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Seconds; + Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds; Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; StoppingTokenSource = new CancellationTokenSource(); @@ -120,21 +120,21 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DelayTimeSpan.Milliseconds, StoppingToken); + await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken); } } } protected virtual async Task DeleteOldEventsAsync() { - if (LastCleanTime != null && LastCleanTime > Clock.Now.Add(EventBusBoxesOptions.CleanOldEventTimeIntervalSpan)) + if (LastCleanTime != null && LastCleanTime + EventBusBoxesOptions.CleanOldEventTimeIntervalSpan > Clock.Now) { return; } await Inbox.DeleteOldEventsAsync(); - LastCleanTime = DateTime.Now; + LastCleanTime = Clock.Now; } } } diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs index e8a05c60ba..32545227c3 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs @@ -39,7 +39,7 @@ namespace Volo.Abp.EventBus.Boxes DistributedEventBus = distributedEventBus; DistributedLockProvider = distributedLockProvider; EventBusBoxesOptions = eventBusBoxesOptions.Value; - Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Seconds; + Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds; Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; StoppingTokenSource = new CancellationTokenSource(); @@ -100,7 +100,7 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DelayTimeSpan.Milliseconds, StoppingToken); + await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken); } } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs index 5b71542665..ab8fe6823b 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs @@ -1,4 +1,3 @@ -using System; using Volo.Abp.Collections; namespace Volo.Abp.EventBus.Distributed @@ -10,18 +9,11 @@ namespace Volo.Abp.EventBus.Distributed public OutboxConfigDictionary Outboxes { get; } public InboxConfigDictionary Inboxes { get; } - - /// - /// Default: -2 hours - /// - public TimeSpan InboxKeepEventTimeSpan { get; set; } - public AbpDistributedEventBusOptions() { Handlers = new TypeList(); Outboxes = new OutboxConfigDictionary(); Inboxes = new InboxConfigDictionary(); - InboxKeepEventTimeSpan = TimeSpan.FromHours(-2); } } } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs index 387e860bce..792a85e124 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs @@ -4,6 +4,6 @@ namespace Volo.Abp.MongoDB.DistributedEvents { public interface IHasEventInbox : IAbpMongoDbContext { - IMongoCollection IncomingEvents { get; set; } + IMongoCollection IncomingEvents { get; } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs index cf57aaa699..ab4bc10f35 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs @@ -4,6 +4,6 @@ namespace Volo.Abp.MongoDB.DistributedEvents { public interface IHasEventOutbox : IAbpMongoDbContext { - IMongoCollection OutgoingEvents { get; set; } + IMongoCollection OutgoingEvents { get; } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index 62927d2279..3cf5485512 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.Options; using MongoDB.Driver; using MongoDB.Driver.Linq; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -16,17 +17,17 @@ namespace Volo.Abp.MongoDB.DistributedEvents where TMongoDbContext : IHasEventInbox { protected IMongoDbContextProvider DbContextProvider { get; } - protected AbpDistributedEventBusOptions DistributedEventsOptions { get; } + protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } protected IClock Clock { get; } public MongoDbContextEventInbox( IMongoDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) + IOptions eventBusBoxesOptions) { DbContextProvider = dbContextProvider; Clock = clock; - DistributedEventsOptions = distributedEventsOptions.Value; + EventBusBoxesOptions = eventBusBoxesOptions.Value; } @@ -96,7 +97,7 @@ namespace Volo.Abp.MongoDB.DistributedEvents public virtual async Task DeleteOldEventsAsync() { var dbContext = await DbContextProvider.GetDbContextAsync(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; if (dbContext.SessionHandle != null) { diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs index a7f1b78f86..95370bb4d2 100644 --- a/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs @@ -11,16 +11,9 @@ namespace DistDemoApp public IMongoCollection TodoItems => Collection(); public IMongoCollection TodoSummaries => Collection(); - public IMongoCollection OutgoingEvents - { - get => Collection(); - set {} - } - public IMongoCollection IncomingEvents - { - get => Collection(); - set {} - } + public IMongoCollection OutgoingEvents => Collection(); + + public IMongoCollection IncomingEvents => Collection(); } }