Browse Source

Added `filter` to `GetWaitingEventsAsync` method.

pull/21716/head
maliming 1 year ago
parent
commit
f6af11b7c6
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 10
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs
  2. 10
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs
  3. 1
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs
  4. 1
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs
  5. 3
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs
  6. 3
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs
  7. 17
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs
  8. 15
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs
  9. 38
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs
  10. 2
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs
  11. 2
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs
  12. 15
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs
  13. 2
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs
  14. 10
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs
  15. 11
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs
  16. 11
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs

10
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
@ -36,14 +37,21 @@ public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
}
[UnitOfWork]
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IIncomingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
Expression<Func<IncomingEventRecord, bool>>? transformedFilter = null;
if (filter != null)
{
transformedFilter = InboxOutboxFilterExpressionTransformer.Transform<IIncomingEventInfo, IncomingEventRecord>(filter)!;
}
var outgoingEventRecords = await dbContext
.IncomingEvents
.AsNoTracking()
.Where(x => !x.Processed)
.WhereIf(transformedFilter != null, transformedFilter!)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync(cancellationToken: cancellationToken);

10
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
@ -28,13 +29,20 @@ public class DbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext
}
[UnitOfWork]
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IOutgoingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default)
{
var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();
Expression<Func<OutgoingEventRecord, bool>>? transformedFilter = null;
if (filter != null)
{
transformedFilter = InboxOutboxFilterExpressionTransformer.Transform<IOutgoingEventInfo, OutgoingEventRecord>(filter)!;
}
var outgoingEventRecords = await dbContext
.OutgoingEvents
.AsNoTracking()
.WhereIf(transformedFilter != null, transformedFilter!)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync(cancellationToken: cancellationToken);

1
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs

@ -8,6 +8,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;
public class IncomingEventRecord :
BasicAggregateRoot<Guid>,
IIncomingEventInfo,
IHasExtraProperties,
IHasCreationTime
{

1
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs

@ -8,6 +8,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;
public class OutgoingEventRecord :
BasicAggregateRoot<Guid>,
IOutgoingEventInfo,
IHasExtraProperties,
IHasCreationTime
{

3
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
@ -9,7 +10,7 @@ public interface IEventInbox
{
Task EnqueueAsync(IncomingEventInfo incomingEvent);
Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default);
Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IIncomingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default);
Task MarkAsProcessedAsync(Guid id);

3
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
@ -9,7 +10,7 @@ public interface IEventOutbox
{
Task EnqueueAsync(OutgoingEventInfo outgoingEvent);
Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default);
Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IOutgoingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default);
Task DeleteAsync(Guid id);

17
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs

@ -0,0 +1,17 @@
using System;
using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed;
public interface IIncomingEventInfo : IHasExtraProperties
{
public Guid Id { get; }
public string MessageId { get; }
public string EventName { get; }
public byte[] EventData { get; }
public DateTime CreationTime { get; }
}

15
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs

@ -0,0 +1,15 @@
using System;
using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed;
public interface IOutgoingEventInfo : IHasExtraProperties
{
public Guid Id { get; }
public string EventName { get; }
public byte[] EventData { get; }
public DateTime CreationTime { get; }
}

38
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs

@ -0,0 +1,38 @@
using System;
using System.Linq.Expressions;
namespace Volo.Abp.EventBus.Distributed;
public static class InboxOutboxFilterExpressionTransformer
{
public static Expression<Func<TTarget, bool>> Transform<TOriginal, TTarget>(Expression<Func<TOriginal, bool>> originalExpression)
{
var originalParam = originalExpression.Parameters[0];
var newParam = Expression.Parameter(typeof(TTarget), originalParam.Name);
var body = ReplaceParameter(originalExpression.Body, originalParam, newParam);
return Expression.Lambda<Func<TTarget, bool>>(body, newParam);
}
private static Expression ReplaceParameter(Expression body, ParameterExpression oldParam, ParameterExpression newParam)
{
var visitor = new ParameterReplacer(oldParam, newParam);
return visitor.Visit(body);
}
private class ParameterReplacer : ExpressionVisitor
{
private readonly ParameterExpression _oldParam;
private readonly ParameterExpression _newParam;
public ParameterReplacer(ParameterExpression oldParam, ParameterExpression newParam)
{
_oldParam = oldParam;
_newParam = newParam;
}
protected override Expression VisitParameter(ParameterExpression node)
{
return node == _oldParam ? _newParam : base.VisitParameter(node);
}
}
}

2
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs

@ -4,7 +4,7 @@ using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed;
public class IncomingEventInfo : IHasExtraProperties
public class IncomingEventInfo : IIncomingEventInfo
{
public static int MaxEventNameLength { get; set; } = 256;

2
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs

@ -4,7 +4,7 @@ using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed;
public class OutgoingEventInfo : IHasExtraProperties
public class OutgoingEventInfo : IOutgoingEventInfo
{
public static int MaxEventNameLength { get; set; } = 256;

15
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs

@ -1,4 +1,5 @@
using System;
using System.Linq.Expressions;
namespace Volo.Abp.EventBus.Distributed;
@ -14,11 +15,21 @@ public class AbpEventBusBoxesOptions
/// </summary>
public int InboxWaitingEventMaxCount { get; set; }
/// <summary>
/// Default: null, means all events
/// </summary>
public Expression<Func<IIncomingEventInfo, bool>>? InboxProcessorFilter { get; set; }
/// <summary>
/// Default: 1000
/// </summary>
public int OutboxWaitingEventMaxCount { get; set; }
/// <summary>
/// Default: null, means all events
/// </summary>
public Expression<Func<IOutgoingEventInfo, bool>>? OutboxProcessorFilter { get; set; }
/// <summary>
/// Period time of <see cref="InboxProcessor"/> and <see cref="OutboxSender"/>
/// Default: 2 seconds
@ -34,7 +45,7 @@ public class AbpEventBusBoxesOptions
/// Default: 2 hours
/// </summary>
public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; }
/// <summary>
/// Default: true
/// </summary>
@ -44,7 +55,9 @@ public class AbpEventBusBoxesOptions
{
CleanOldEventTimeIntervalSpan = TimeSpan.FromHours(6);
InboxWaitingEventMaxCount = 1000;
InboxProcessorFilter = null;
OutboxWaitingEventMaxCount = 1000;
OutboxProcessorFilter = null;
PeriodTimeSpan = TimeSpan.FromSeconds(2);
DistributedLockWaitDuration = TimeSpan.FromSeconds(15);
WaitTimeToDeleteProcessedInboxEvents = TimeSpan.FromHours(2);

2
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs

@ -92,7 +92,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
while (true)
{
var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken);
var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, EventBusBoxesOptions.InboxProcessorFilter, StoppingToken);
if (waitingEvents.Count <= 0)
{
break;

10
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs

@ -77,14 +77,14 @@ public class OutboxSender : IOutboxSender, ITransientDependency
{
while (true)
{
var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken);
var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, EventBusBoxesOptions.OutboxProcessorFilter, StoppingToken);
if (waitingEvents.Count <= 0)
{
break;
}
Logger.LogInformation($"Found {waitingEvents.Count} events in the outbox.");
if (EventBusBoxesOptions.BatchPublishOutboxEvents)
{
await PublishOutgoingMessagesInBatchAsync(waitingEvents);
@ -119,7 +119,7 @@ public class OutboxSender : IOutboxSender, ITransientDependency
);
await Outbox.DeleteAsync(waitingEvent.Id);
Logger.LogInformation($"Sent the event to the message broker with id = {waitingEvent.Id:N}");
}
}
@ -129,9 +129,9 @@ public class OutboxSender : IOutboxSender, ITransientDependency
await DistributedEventBus
.AsSupportsEventBoxes()
.PublishManyFromOutboxAsync(waitingEvents, OutboxConfig);
await Outbox.DeleteManyAsync(waitingEvents.Select(x => x.Id).ToArray());
Logger.LogInformation($"Sent {waitingEvents.Count} events to message broker");
}
}

11
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
@ -50,16 +51,24 @@ public class MongoDbContextEventInbox<TMongoDbContext> : IMongoDbContextEventInb
}
[UnitOfWork]
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IIncomingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default)
{
var dbContext = await DbContextProvider.GetDbContextAsync(cancellationToken);
Expression<Func<IncomingEventRecord, bool>>? transformedFilter = null;
if (filter != null)
{
transformedFilter = InboxOutboxFilterExpressionTransformer.Transform<IIncomingEventInfo, IncomingEventRecord>(filter)!;
}
var outgoingEventRecords = await dbContext
.IncomingEvents
.AsQueryable()
.Where(x => !x.Processed)
.WhereIf(transformedFilter != null, transformedFilter!)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.As<IMongoQueryable<IncomingEventRecord>>()
.ToListAsync(cancellationToken: cancellationToken);
return outgoingEventRecords

11
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
@ -40,14 +41,22 @@ public class MongoDbContextEventOutbox<TMongoDbContext> : IMongoDbContextEventOu
}
[UnitOfWork]
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IOutgoingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default)
{
var dbContext = (IHasEventOutbox)await MongoDbContextProvider.GetDbContextAsync(cancellationToken);
Expression<Func<OutgoingEventRecord, bool>>? transformedFilter = null;
if (filter != null)
{
transformedFilter = InboxOutboxFilterExpressionTransformer.Transform<IOutgoingEventInfo, OutgoingEventRecord>(filter)!;
}
var outgoingEventRecords = await dbContext
.OutgoingEvents.AsQueryable()
.WhereIf(transformedFilter != null, transformedFilter!)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.As<IMongoQueryable<OutgoingEventRecord>>()
.ToListAsync(cancellationToken: cancellationToken);
return outgoingEventRecords

Loading…
Cancel
Save