From f6af11b7c6b0a44a3c6908c5bdfa89da9dda00a3 Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 24 Dec 2024 14:48:35 +0800 Subject: [PATCH] Added `filter` to `GetWaitingEventsAsync` method. --- .../DistributedEvents/DbContextEventInbox.cs | 10 ++++- .../DistributedEvents/DbContextEventOutbox.cs | 10 ++++- .../DistributedEvents/IncomingEventRecord.cs | 1 + .../DistributedEvents/OutgoingEventRecord.cs | 1 + .../Abp/EventBus/Distributed/IEventInbox.cs | 3 +- .../Abp/EventBus/Distributed/IEventOutbox.cs | 3 +- .../Distributed/IIncomingEventInfo.cs | 17 +++++++++ .../Distributed/IOutgoingEventInfo.cs | 15 ++++++++ .../InboxOutboxFilterExpressionTransformer.cs | 38 +++++++++++++++++++ .../EventBus/Distributed/IncomingEventInfo.cs | 2 +- .../EventBus/Distributed/OutgoingEventInfo.cs | 2 +- .../Distributed/AbpEventBusBoxesOptions.cs | 15 +++++++- .../EventBus/Distributed/InboxProcessor.cs | 2 +- .../Abp/EventBus/Distributed/OutboxSender.cs | 10 ++--- .../MongoDbContextEventInbox.cs | 11 +++++- .../MongoDbContextEventOutbox.cs | 11 +++++- 16 files changed, 136 insertions(+), 15 deletions(-) create mode 100644 framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs create mode 100644 framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs create mode 100644 framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index f5953ea3b1..54ddfd0e5e 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; @@ -36,14 +37,21 @@ public class DbContextEventInbox : IDbContextEventInbox } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) + public virtual async Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default) { var dbContext = await DbContextProvider.GetDbContextAsync(); + Expression>? transformedFilter = null; + if (filter != null) + { + transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; + } + var outgoingEventRecords = await dbContext .IncomingEvents .AsNoTracking() .Where(x => !x.Processed) + .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) .ToListAsync(cancellationToken: cancellationToken); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs index fecfd1e8ce..a541698b3e 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; @@ -28,13 +29,20 @@ public class DbContextEventOutbox : IDbContextEventOutbox> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) + public virtual async Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default) { var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync(); + Expression>? transformedFilter = null; + if (filter != null) + { + transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; + } + var outgoingEventRecords = await dbContext .OutgoingEvents .AsNoTracking() + .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) .ToListAsync(cancellationToken: cancellationToken); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs index 6cb37a72d5..be7da15890 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs @@ -8,6 +8,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents; public class IncomingEventRecord : BasicAggregateRoot, + IIncomingEventInfo, IHasExtraProperties, IHasCreationTime { diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs index 7272c9ac30..625a93b25a 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs @@ -8,6 +8,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents; public class OutgoingEventRecord : BasicAggregateRoot, + IOutgoingEventInfo, IHasExtraProperties, IHasCreationTime { diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs index 3700f74232..c154330495 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; @@ -9,7 +10,7 @@ public interface IEventInbox { Task EnqueueAsync(IncomingEventInfo incomingEvent); - Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default); + Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default); Task MarkAsProcessedAsync(Guid id); diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs index 018747945c..bc538ef839 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; @@ -9,7 +10,7 @@ public interface IEventOutbox { Task EnqueueAsync(OutgoingEventInfo outgoingEvent); - Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default); + Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default); Task DeleteAsync(Guid id); diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs new file mode 100644 index 0000000000..7f1f1be640 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs @@ -0,0 +1,17 @@ +using System; +using Volo.Abp.Data; + +namespace Volo.Abp.EventBus.Distributed; + +public interface IIncomingEventInfo : IHasExtraProperties +{ + public Guid Id { get; } + + public string MessageId { get; } + + public string EventName { get; } + + public byte[] EventData { get; } + + public DateTime CreationTime { get; } +} diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs new file mode 100644 index 0000000000..9fe4d35eb5 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs @@ -0,0 +1,15 @@ +using System; +using Volo.Abp.Data; + +namespace Volo.Abp.EventBus.Distributed; + +public interface IOutgoingEventInfo : IHasExtraProperties +{ + public Guid Id { get; } + + public string EventName { get; } + + public byte[] EventData { get; } + + public DateTime CreationTime { get; } +} diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs new file mode 100644 index 0000000000..935e760546 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs @@ -0,0 +1,38 @@ +using System; +using System.Linq.Expressions; + +namespace Volo.Abp.EventBus.Distributed; + +public static class InboxOutboxFilterExpressionTransformer +{ + public static Expression> Transform(Expression> originalExpression) + { + var originalParam = originalExpression.Parameters[0]; + var newParam = Expression.Parameter(typeof(TTarget), originalParam.Name); + var body = ReplaceParameter(originalExpression.Body, originalParam, newParam); + return Expression.Lambda>(body, newParam); + } + + private static Expression ReplaceParameter(Expression body, ParameterExpression oldParam, ParameterExpression newParam) + { + var visitor = new ParameterReplacer(oldParam, newParam); + return visitor.Visit(body); + } + + private class ParameterReplacer : ExpressionVisitor + { + private readonly ParameterExpression _oldParam; + private readonly ParameterExpression _newParam; + + public ParameterReplacer(ParameterExpression oldParam, ParameterExpression newParam) + { + _oldParam = oldParam; + _newParam = newParam; + } + + protected override Expression VisitParameter(ParameterExpression node) + { + return node == _oldParam ? _newParam : base.VisitParameter(node); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs index ffd135845a..1be24a3d02 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs @@ -4,7 +4,7 @@ using Volo.Abp.Data; namespace Volo.Abp.EventBus.Distributed; -public class IncomingEventInfo : IHasExtraProperties +public class IncomingEventInfo : IIncomingEventInfo { public static int MaxEventNameLength { get; set; } = 256; diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs index 43e9c42bf8..74b5bca7d4 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs @@ -4,7 +4,7 @@ using Volo.Abp.Data; namespace Volo.Abp.EventBus.Distributed; -public class OutgoingEventInfo : IHasExtraProperties +public class OutgoingEventInfo : IOutgoingEventInfo { public static int MaxEventNameLength { get; set; } = 256; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs index 67facf5d42..cc91cad5df 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs @@ -1,4 +1,5 @@ using System; +using System.Linq.Expressions; namespace Volo.Abp.EventBus.Distributed; @@ -14,11 +15,21 @@ public class AbpEventBusBoxesOptions /// public int InboxWaitingEventMaxCount { get; set; } + /// + /// Default: null, means all events + /// + public Expression>? InboxProcessorFilter { get; set; } + /// /// Default: 1000 /// public int OutboxWaitingEventMaxCount { get; set; } + /// + /// Default: null, means all events + /// + public Expression>? OutboxProcessorFilter { get; set; } + /// /// Period time of and /// Default: 2 seconds @@ -34,7 +45,7 @@ public class AbpEventBusBoxesOptions /// Default: 2 hours /// public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; } - + /// /// Default: true /// @@ -44,7 +55,9 @@ public class AbpEventBusBoxesOptions { CleanOldEventTimeIntervalSpan = TimeSpan.FromHours(6); InboxWaitingEventMaxCount = 1000; + InboxProcessorFilter = null; OutboxWaitingEventMaxCount = 1000; + OutboxProcessorFilter = null; PeriodTimeSpan = TimeSpan.FromSeconds(2); DistributedLockWaitDuration = TimeSpan.FromSeconds(15); WaitTimeToDeleteProcessedInboxEvents = TimeSpan.FromHours(2); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs index e2c0a3c0c6..896264b16c 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs @@ -92,7 +92,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency while (true) { - var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken); + var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, EventBusBoxesOptions.InboxProcessorFilter, StoppingToken); if (waitingEvents.Count <= 0) { break; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs index 99ee06ca10..a398dc8478 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs @@ -77,14 +77,14 @@ public class OutboxSender : IOutboxSender, ITransientDependency { while (true) { - var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken); + var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, EventBusBoxesOptions.OutboxProcessorFilter, StoppingToken); if (waitingEvents.Count <= 0) { break; } Logger.LogInformation($"Found {waitingEvents.Count} events in the outbox."); - + if (EventBusBoxesOptions.BatchPublishOutboxEvents) { await PublishOutgoingMessagesInBatchAsync(waitingEvents); @@ -119,7 +119,7 @@ public class OutboxSender : IOutboxSender, ITransientDependency ); await Outbox.DeleteAsync(waitingEvent.Id); - + Logger.LogInformation($"Sent the event to the message broker with id = {waitingEvent.Id:N}"); } } @@ -129,9 +129,9 @@ public class OutboxSender : IOutboxSender, ITransientDependency await DistributedEventBus .AsSupportsEventBoxes() .PublishManyFromOutboxAsync(waitingEvents, OutboxConfig); - + await Outbox.DeleteManyAsync(waitingEvents.Select(x => x.Id).ToArray()); - + Logger.LogInformation($"Sent {waitingEvents.Count} events to message broker"); } } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index 55b40fa29b..6a73af27f5 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Options; @@ -50,16 +51,24 @@ public class MongoDbContextEventInbox : IMongoDbContextEventInb } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) + public virtual async Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default) { var dbContext = await DbContextProvider.GetDbContextAsync(cancellationToken); + Expression>? transformedFilter = null; + if (filter != null) + { + transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; + } + var outgoingEventRecords = await dbContext .IncomingEvents .AsQueryable() .Where(x => !x.Processed) + .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) + .As>() .ToListAsync(cancellationToken: cancellationToken); return outgoingEventRecords diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs index cc6c4d42df..57b59038b5 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver; @@ -40,14 +41,22 @@ public class MongoDbContextEventOutbox : IMongoDbContextEventOu } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) + public virtual async Task> GetWaitingEventsAsync(int maxCount, Expression>? filter = null, CancellationToken cancellationToken = default) { var dbContext = (IHasEventOutbox)await MongoDbContextProvider.GetDbContextAsync(cancellationToken); + Expression>? transformedFilter = null; + if (filter != null) + { + transformedFilter = InboxOutboxFilterExpressionTransformer.Transform(filter)!; + } + var outgoingEventRecords = await dbContext .OutgoingEvents.AsQueryable() + .WhereIf(transformedFilter != null, transformedFilter!) .OrderBy(x => x.CreationTime) .Take(maxCount) + .As>() .ToListAsync(cancellationToken: cancellationToken); return outgoingEventRecords