From f6af11b7c6b0a44a3c6908c5bdfa89da9dda00a3 Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 24 Dec 2024 14:48:35 +0800 Subject: [PATCH 1/4] Added `filter` to `GetWaitingEventsAsync` method. --- .../DistributedEvents/DbContextEventInbox.cs | 10 ++++- .../DistributedEvents/DbContextEventOutbox.cs | 10 ++++- .../DistributedEvents/IncomingEventRecord.cs | 1 + .../DistributedEvents/OutgoingEventRecord.cs | 1 + .../Abp/EventBus/Distributed/IEventInbox.cs | 3 +- .../Abp/EventBus/Distributed/IEventOutbox.cs | 3 +- .../Distributed/IIncomingEventInfo.cs | 17 +++++++++ .../Distributed/IOutgoingEventInfo.cs | 15 ++++++++ .../InboxOutboxFilterExpressionTransformer.cs | 38 +++++++++++++++++++ .../EventBus/Distributed/IncomingEventInfo.cs | 2 +- .../EventBus/Distributed/OutgoingEventInfo.cs | 2 +- .../Distributed/AbpEventBusBoxesOptions.cs | 15 +++++++- .../EventBus/Distributed/InboxProcessor.cs | 2 +- .../Abp/EventBus/Distributed/OutboxSender.cs | 10 ++--- .../MongoDbContextEventInbox.cs | 11 +++++- .../MongoDbContextEventOutbox.cs | 11 +++++- 16 files changed, 136 insertions(+), 15 deletions(-) create mode 100644 framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs create mode 100644 framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs create mode 100644 framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index f5953ea3b1..54ddfd0e5e 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; @@ -36,14 +37,21 @@ public class DbContextEventInbox : IDbContextEventInbox } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) + public virtual async Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default) { var dbContext = await DbContextProvider.GetDbContextAsync(); + Expression>? transformedFilter = null; + if (filter != null) + { + transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; + } + var outgoingEventRecords = await dbContext .IncomingEvents .AsNoTracking() .Where(x => !x.Processed) + .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) .ToListAsync(cancellationToken: cancellationToken); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs index fecfd1e8ce..a541698b3e 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; @@ -28,13 +29,20 @@ public class DbContextEventOutbox : IDbContextEventOutbox> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) + public virtual async Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default) { var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync(); + Expression>? transformedFilter = null; + if (filter != null) + { + transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; + } + var outgoingEventRecords = await dbContext .OutgoingEvents .AsNoTracking() + .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) .ToListAsync(cancellationToken: cancellationToken); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs index 6cb37a72d5..be7da15890 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs @@ -8,6 +8,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents; public class IncomingEventRecord : BasicAggregateRoot, + IIncomingEventInfo, IHasExtraProperties, IHasCreationTime { diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs index 7272c9ac30..625a93b25a 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs @@ -8,6 +8,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents; public class OutgoingEventRecord : BasicAggregateRoot, + IOutgoingEventInfo, IHasExtraProperties, IHasCreationTime { diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs index 3700f74232..c154330495 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; @@ -9,7 +10,7 @@ public interface IEventInbox { Task EnqueueAsync(IncomingEventInfo incomingEvent); - Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default); + Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default); Task MarkAsProcessedAsync(Guid id); diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs index 018747945c..bc538ef839 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; @@ -9,7 +10,7 @@ public interface IEventOutbox { Task EnqueueAsync(OutgoingEventInfo outgoingEvent); - Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default); + Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default); Task DeleteAsync(Guid id); diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs new file mode 100644 index 0000000000..7f1f1be640 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs @@ -0,0 +1,17 @@ +using System; +using Volo.Abp.Data; + +namespace Volo.Abp.EventBus.Distributed; + +public interface IIncomingEventInfo : IHasExtraProperties +{ + public Guid Id { get; } + + public string MessageId { get; } + + public string EventName { get; } + + public byte[] EventData { get; } + + public DateTime CreationTime { get; } +} diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs new file mode 100644 index 0000000000..9fe4d35eb5 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs @@ -0,0 +1,15 @@ +using System; +using Volo.Abp.Data; + +namespace Volo.Abp.EventBus.Distributed; + +public interface IOutgoingEventInfo : IHasExtraProperties +{ + public Guid Id { get; } + + public string EventName { get; } + + public byte[] EventData { get; } + + public DateTime CreationTime { get; } +} diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs new file mode 100644 index 0000000000..935e760546 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs @@ -0,0 +1,38 @@ +using System; +using System.Linq.Expressions; + +namespace Volo.Abp.EventBus.Distributed; + +public static class InboxOutboxFilterExpressionTransformer +{ + public static Expression> Transform(Expression> originalExpression) + { + var originalParam = originalExpression.Parameters[0]; + var newParam = Expression.Parameter(typeof(TTarget), originalParam.Name); + var body = ReplaceParameter(originalExpression.Body, originalParam, newParam); + return Expression.Lambda>(body, newParam); + } + + private static Expression ReplaceParameter(Expression body, ParameterExpression oldParam, ParameterExpression newParam) + { + var visitor = new ParameterReplacer(oldParam, newParam); + return visitor.Visit(body); + } + + private class ParameterReplacer : ExpressionVisitor + { + private readonly ParameterExpression _oldParam; + private readonly ParameterExpression _newParam; + + public ParameterReplacer(ParameterExpression oldParam, ParameterExpression newParam) + { + _oldParam = oldParam; + _newParam = newParam; + } + + protected override Expression VisitParameter(ParameterExpression node) + { + return node == _oldParam ? _newParam : base.VisitParameter(node); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs index ffd135845a..1be24a3d02 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs @@ -4,7 +4,7 @@ using Volo.Abp.Data; namespace Volo.Abp.EventBus.Distributed; -public class IncomingEventInfo : IHasExtraProperties +public class IncomingEventInfo : IIncomingEventInfo { public static int MaxEventNameLength { get; set; } = 256; diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs index 43e9c42bf8..74b5bca7d4 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs @@ -4,7 +4,7 @@ using Volo.Abp.Data; namespace Volo.Abp.EventBus.Distributed; -public class OutgoingEventInfo : IHasExtraProperties +public class OutgoingEventInfo : IOutgoingEventInfo { public static int MaxEventNameLength { get; set; } = 256; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs index 67facf5d42..cc91cad5df 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs @@ -1,4 +1,5 @@ using System; +using System.Linq.Expressions; namespace Volo.Abp.EventBus.Distributed; @@ -14,11 +15,21 @@ public class AbpEventBusBoxesOptions /// public int InboxWaitingEventMaxCount { get; set; } + /// + /// Default: null, means all events + /// + public Expression>? InboxProcessorFilter { get; set; } + /// /// Default: 1000 /// public int OutboxWaitingEventMaxCount { get; set; } + /// + /// Default: null, means all events + /// + public Expression>? OutboxProcessorFilter { get; set; } + /// /// Period time of and /// Default: 2 seconds @@ -34,7 +45,7 @@ public class AbpEventBusBoxesOptions /// Default: 2 hours /// public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; } - + /// /// Default: true /// @@ -44,7 +55,9 @@ public class AbpEventBusBoxesOptions { CleanOldEventTimeIntervalSpan = TimeSpan.FromHours(6); InboxWaitingEventMaxCount = 1000; + InboxProcessorFilter = null; OutboxWaitingEventMaxCount = 1000; + OutboxProcessorFilter = null; PeriodTimeSpan = TimeSpan.FromSeconds(2); DistributedLockWaitDuration = TimeSpan.FromSeconds(15); WaitTimeToDeleteProcessedInboxEvents = TimeSpan.FromHours(2); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index e2c0a3c0c6..896264b16c 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -92,7 +92,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency while (true) { - var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken); + var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, EventBusBoxesOptions.InboxProcessorFilter, StoppingToken); if (waitingEvents.Count <= 0) { break; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs index 99ee06ca10..a398dc8478 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs @@ -77,14 +77,14 @@ public class OutboxSender : IOutboxSender, ITransientDependency { while (true) { - var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken); + var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, EventBusBoxesOptions.OutboxProcessorFilter, StoppingToken); if (waitingEvents.Count <= 0) { break; } Logger.LogInformation($"Found {waitingEvents.Count} events in the outbox."); - + if (EventBusBoxesOptions.BatchPublishOutboxEvents) { await PublishOutgoingMessagesInBatchAsync(waitingEvents); @@ -119,7 +119,7 @@ public class OutboxSender : IOutboxSender, ITransientDependency ); await Outbox.DeleteAsync(waitingEvent.Id); - + Logger.LogInformation($"Sent the event to the message broker with id = {waitingEvent.Id:N}"); } } @@ -129,9 +129,9 @@ public class OutboxSender : IOutboxSender, ITransientDependency await DistributedEventBus .AsSupportsEventBoxes() .PublishManyFromOutboxAsync(waitingEvents, OutboxConfig); - + await Outbox.DeleteManyAsync(waitingEvents.Select(x => x.Id).ToArray()); - + Logger.LogInformation($"Sent {waitingEvents.Count} events to message broker"); } } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index 55b40fa29b..6a73af27f5 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Options; @@ -50,16 +51,24 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) + public virtual async Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default) { var dbContext = await DbContextProvider.GetDbContextAsync(cancellationToken); + Expression>? transformedFilter = null; + if (filter != null) + { + transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; + } + var outgoingEventRecords = await dbContext .IncomingEvents .AsQueryable() .Where(x => !x.Processed) + .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) + .As>() .ToListAsync(cancellationToken: cancellationToken); return outgoingEventRecords diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs index cc6c4d42df..57b59038b5 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver; @@ -40,14 +41,22 @@ public class MongoDbContextEventOutbox : IMongoDbContextEventOu } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) + public virtual async Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default) { var dbContext = (IHasEventOutbox)await MongoDbContextProvider.GetDbContextAsync(cancellationToken); + Expression>? transformedFilter = null; + if (filter != null) + { + transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; + } + var outgoingEventRecords = await dbContext .OutgoingEvents.AsQueryable() + .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) + .As>() .ToListAsync(cancellationToken: cancellationToken); return outgoingEventRecords From b4b4738e425a68a7ec3c07475507b5d0dc2a285d Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 24 Dec 2024 15:11:14 +0800 Subject: [PATCH 2/4] Added `virtual GetWaitingEventsAsync` to `InboxProcessor/OutboxSender`. --- .../Volo/Abp/EventBus/Distributed/InboxProcessor.cs | 12 +++++++++--- .../Volo/Abp/EventBus/Distributed/OutboxSender.cs | 7 ++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index 896264b16c..6ce431daba 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; @@ -60,7 +61,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency await RunAsync(); } - public Task StartAsync(InboxConfig inboxConfig, CancellationToken cancellationToken = default) + public virtual Task StartAsync(InboxConfig inboxConfig, CancellationToken cancellationToken = default) { InboxConfig = inboxConfig; Inbox = (IEventInbox)ServiceProvider.GetRequiredService(inboxConfig.ImplementationType); @@ -69,7 +70,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency return Task.CompletedTask; } - public Task StopAsync(CancellationToken cancellationToken = default) + public virtual Task StopAsync(CancellationToken cancellationToken = default) { StoppingTokenSource.Cancel(); Timer.Stop(cancellationToken); @@ -92,7 +93,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency while (true) { - var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, EventBusBoxesOptions.InboxProcessorFilter, StoppingToken); + var waitingEvents = await GetWaitingEventsAsync(); if (waitingEvents.Count <= 0) { break; @@ -129,6 +130,11 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency } } + protected virtual async Task> GetWaitingEventsAsync() + { + return await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, EventBusBoxesOptions.InboxProcessorFilter, StoppingToken); + } + protected virtual async Task DeleteOldEventsAsync() { if (LastCleanTime != null && LastCleanTime + EventBusBoxesOptions.CleanOldEventTimeIntervalSpan > Clock.Now) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs index a398dc8478..bbc41e3f86 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs @@ -77,7 +77,7 @@ public class OutboxSender : IOutboxSender, ITransientDependency { while (true) { - var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, EventBusBoxesOptions.OutboxProcessorFilter, StoppingToken); + var waitingEvents = await GetWaitingEventsAsync(); if (waitingEvents.Count <= 0) { break; @@ -107,6 +107,11 @@ public class OutboxSender : IOutboxSender, ITransientDependency } } + protected virtual async Task> GetWaitingEventsAsync() + { + return await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, EventBusBoxesOptions.OutboxProcessorFilter, StoppingToken); + } + protected virtual async Task PublishOutgoingMessagesAsync(List waitingEvents) { foreach (var waitingEvent in waitingEvents) From 1587b91f2a33c7fde58d953c639db39030e28bda Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 24 Dec 2024 15:34:51 +0800 Subject: [PATCH 3/4] Allow to change `DistributedLockName` in sub-class. --- .../Volo/Abp/EventBus/Distributed/InboxProcessor.cs | 2 +- .../Volo/Abp/EventBus/Distributed/OutboxSender.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index 6ce431daba..06014701a2 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -28,7 +28,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency protected DateTime? LastCleanTime { get; set; } - protected string DistributedLockName { get; private set; } = default!; + protected string DistributedLockName { get; set; } = default!; public ILogger Logger { get; set; } protected CancellationTokenSource StoppingTokenSource { get; } protected CancellationToken StoppingToken { get; } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs index bbc41e3f86..2e610d4471 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs @@ -22,7 +22,7 @@ public class OutboxSender : IOutboxSender, ITransientDependency protected IEventOutbox Outbox { get; private set; } = default!; protected OutboxConfig OutboxConfig { get; private set; } = default!; protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } - protected string DistributedLockName { get; private set; } = default!; + protected string DistributedLockName { get; set; } = default!; public ILogger Logger { get; set; } protected CancellationTokenSource StoppingTokenSource { get; } From deba424346c08df1e2e787a66d67f6607ca8eef2 Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 24 Dec 2024 15:46:26 +0800 Subject: [PATCH 4/4] Remove `public` keyword from `interface`. --- .../Abp/EventBus/Distributed/IIncomingEventInfo.cs | 10 +++++----- .../Abp/EventBus/Distributed/IOutgoingEventInfo.cs | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs index 7f1f1be640..e52325b71f 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs @@ -5,13 +5,13 @@ namespace Volo.Abp.EventBus.Distributed; public interface IIncomingEventInfo : IHasExtraProperties { - public Guid Id { get; } + Guid Id { get; } - public string MessageId { get; } + string MessageId { get; } - public string EventName { get; } + string EventName { get; } - public byte[] EventData { get; } + byte[] EventData { get; } - public DateTime CreationTime { get; } + DateTime CreationTime { get; } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs index 9fe4d35eb5..58dd4a9713 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs @@ -5,11 +5,11 @@ namespace Volo.Abp.EventBus.Distributed; public interface IOutgoingEventInfo : IHasExtraProperties { - public Guid Id { get; } + Guid Id { get; } - public string EventName { get; } + string EventName { get; } - public byte[] EventData { get; } + byte[] EventData { get; } - public DateTime CreationTime { get; } + DateTime CreationTime { get; } }