Browse Source

Merge pull request #21716 from abpframework/GetWaitingEventsAsync

Added `filter` to `GetWaitingEventsAsync` method.
pull/21752/head
Engincan VESKE 1 year ago
committed by GitHub
parent
commit
edae8a6201
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 10
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs
  2. 10
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs
  3. 1
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs
  4. 1
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs
  5. 3
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs
  6. 3
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs
  7. 17
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs
  8. 15
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs
  9. 38
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs
  10. 2
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs
  11. 2
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs
  12. 15
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs
  13. 14
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs
  14. 17
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs
  15. 11
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs
  16. 11
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs

10
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
@ -36,14 +37,21 @@ public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
}
[UnitOfWork]
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IIncomingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
Expression<Func<IncomingEventRecord, bool>>? transformedFilter = null;
if (filter != null)
{
transformedFilter = InboxOutboxFilterExpressionTransformer.Transform<IIncomingEventInfo, IncomingEventRecord>(filter)!;
}
var outgoingEventRecords = await dbContext
.IncomingEvents
.AsNoTracking()
.Where(x => !x.Processed)
.WhereIf(transformedFilter != null, transformedFilter!)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync(cancellationToken: cancellationToken);

10
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
@ -28,13 +29,20 @@ public class DbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext
}
[UnitOfWork]
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IOutgoingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default)
{
var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();
Expression<Func<OutgoingEventRecord, bool>>? transformedFilter = null;
if (filter != null)
{
transformedFilter = InboxOutboxFilterExpressionTransformer.Transform<IOutgoingEventInfo, OutgoingEventRecord>(filter)!;
}
var outgoingEventRecords = await dbContext
.OutgoingEvents
.AsNoTracking()
.WhereIf(transformedFilter != null, transformedFilter!)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync(cancellationToken: cancellationToken);

1
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs

@ -8,6 +8,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;
public class IncomingEventRecord :
BasicAggregateRoot<Guid>,
IIncomingEventInfo,
IHasExtraProperties,
IHasCreationTime
{

1
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs

@ -8,6 +8,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;
public class OutgoingEventRecord :
BasicAggregateRoot<Guid>,
IOutgoingEventInfo,
IHasExtraProperties,
IHasCreationTime
{

3
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventInbox.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
@ -9,7 +10,7 @@ public interface IEventInbox
{
Task EnqueueAsync(IncomingEventInfo incomingEvent);
Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default);
Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IIncomingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default);
Task MarkAsProcessedAsync(Guid id);

3
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IEventOutbox.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
@ -9,7 +10,7 @@ public interface IEventOutbox
{
Task EnqueueAsync(OutgoingEventInfo outgoingEvent);
Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default);
Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IOutgoingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default);
Task DeleteAsync(Guid id);

17
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IIncomingEventInfo.cs

@ -0,0 +1,17 @@
using System;
using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed;
public interface IIncomingEventInfo : IHasExtraProperties
{
Guid Id { get; }
string MessageId { get; }
string EventName { get; }
byte[] EventData { get; }
DateTime CreationTime { get; }
}

15
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IOutgoingEventInfo.cs

@ -0,0 +1,15 @@
using System;
using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed;
public interface IOutgoingEventInfo : IHasExtraProperties
{
Guid Id { get; }
string EventName { get; }
byte[] EventData { get; }
DateTime CreationTime { get; }
}

38
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/InboxOutboxFilterExpressionTransformer.cs

@ -0,0 +1,38 @@
using System;
using System.Linq.Expressions;
namespace Volo.Abp.EventBus.Distributed;
public static class InboxOutboxFilterExpressionTransformer
{
public static Expression<Func<TTarget, bool>> Transform<TOriginal, TTarget>(Expression<Func<TOriginal, bool>> originalExpression)
{
var originalParam = originalExpression.Parameters[0];
var newParam = Expression.Parameter(typeof(TTarget), originalParam.Name);
var body = ReplaceParameter(originalExpression.Body, originalParam, newParam);
return Expression.Lambda<Func<TTarget, bool>>(body, newParam);
}
private static Expression ReplaceParameter(Expression body, ParameterExpression oldParam, ParameterExpression newParam)
{
var visitor = new ParameterReplacer(oldParam, newParam);
return visitor.Visit(body);
}
private class ParameterReplacer : ExpressionVisitor
{
private readonly ParameterExpression _oldParam;
private readonly ParameterExpression _newParam;
public ParameterReplacer(ParameterExpression oldParam, ParameterExpression newParam)
{
_oldParam = oldParam;
_newParam = newParam;
}
protected override Expression VisitParameter(ParameterExpression node)
{
return node == _oldParam ? _newParam : base.VisitParameter(node);
}
}
}

2
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs

@ -4,7 +4,7 @@ using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed;
public class IncomingEventInfo : IHasExtraProperties
public class IncomingEventInfo : IIncomingEventInfo
{
public static int MaxEventNameLength { get; set; } = 256;

2
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs

@ -4,7 +4,7 @@ using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed;
public class OutgoingEventInfo : IHasExtraProperties
public class OutgoingEventInfo : IOutgoingEventInfo
{
public static int MaxEventNameLength { get; set; } = 256;

15
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpEventBusBoxesOptions.cs

@ -1,4 +1,5 @@
using System;
using System.Linq.Expressions;
namespace Volo.Abp.EventBus.Distributed;
@ -14,11 +15,21 @@ public class AbpEventBusBoxesOptions
/// </summary>
public int InboxWaitingEventMaxCount { get; set; }
/// <summary>
/// Default: null, means all events
/// </summary>
public Expression<Func<IIncomingEventInfo, bool>>? InboxProcessorFilter { get; set; }
/// <summary>
/// Default: 1000
/// </summary>
public int OutboxWaitingEventMaxCount { get; set; }
/// <summary>
/// Default: null, means all events
/// </summary>
public Expression<Func<IOutgoingEventInfo, bool>>? OutboxProcessorFilter { get; set; }
/// <summary>
/// Period time of <see cref="InboxProcessor"/> and <see cref="OutboxSender"/>
/// Default: 2 seconds
@ -34,7 +45,7 @@ public class AbpEventBusBoxesOptions
/// Default: 2 hours
/// </summary>
public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; }
/// <summary>
/// Default: true
/// </summary>
@ -44,7 +55,9 @@ public class AbpEventBusBoxesOptions
{
CleanOldEventTimeIntervalSpan = TimeSpan.FromHours(6);
InboxWaitingEventMaxCount = 1000;
InboxProcessorFilter = null;
OutboxWaitingEventMaxCount = 1000;
OutboxProcessorFilter = null;
PeriodTimeSpan = TimeSpan.FromSeconds(2);
DistributedLockWaitDuration = TimeSpan.FromSeconds(15);
WaitTimeToDeleteProcessedInboxEvents = TimeSpan.FromHours(2);

14
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/InboxProcessor.cs

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
@ -27,7 +28,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
protected DateTime? LastCleanTime { get; set; }
protected string DistributedLockName { get; private set; } = default!;
protected string DistributedLockName { get; set; } = default!;
public ILogger<InboxProcessor> Logger { get; set; }
protected CancellationTokenSource StoppingTokenSource { get; }
protected CancellationToken StoppingToken { get; }
@ -60,7 +61,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
await RunAsync();
}
public Task StartAsync(InboxConfig inboxConfig, CancellationToken cancellationToken = default)
public virtual Task StartAsync(InboxConfig inboxConfig, CancellationToken cancellationToken = default)
{
InboxConfig = inboxConfig;
Inbox = (IEventInbox)ServiceProvider.GetRequiredService(inboxConfig.ImplementationType);
@ -69,7 +70,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken = default)
public virtual Task StopAsync(CancellationToken cancellationToken = default)
{
StoppingTokenSource.Cancel();
Timer.Stop(cancellationToken);
@ -92,7 +93,7 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
while (true)
{
var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken);
var waitingEvents = await GetWaitingEventsAsync();
if (waitingEvents.Count <= 0)
{
break;
@ -129,6 +130,11 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
}
}
protected virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync()
{
return await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, EventBusBoxesOptions.InboxProcessorFilter, StoppingToken);
}
protected virtual async Task DeleteOldEventsAsync()
{
if (LastCleanTime != null && LastCleanTime + EventBusBoxesOptions.CleanOldEventTimeIntervalSpan > Clock.Now)

17
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutboxSender.cs

@ -22,7 +22,7 @@ public class OutboxSender : IOutboxSender, ITransientDependency
protected IEventOutbox Outbox { get; private set; } = default!;
protected OutboxConfig OutboxConfig { get; private set; } = default!;
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected string DistributedLockName { get; private set; } = default!;
protected string DistributedLockName { get; set; } = default!;
public ILogger<OutboxSender> Logger { get; set; }
protected CancellationTokenSource StoppingTokenSource { get; }
@ -77,14 +77,14 @@ public class OutboxSender : IOutboxSender, ITransientDependency
{
while (true)
{
var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken);
var waitingEvents = await GetWaitingEventsAsync();
if (waitingEvents.Count <= 0)
{
break;
}
Logger.LogInformation($"Found {waitingEvents.Count} events in the outbox.");
if (EventBusBoxesOptions.BatchPublishOutboxEvents)
{
await PublishOutgoingMessagesInBatchAsync(waitingEvents);
@ -107,6 +107,11 @@ public class OutboxSender : IOutboxSender, ITransientDependency
}
}
protected virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync()
{
return await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, EventBusBoxesOptions.OutboxProcessorFilter, StoppingToken);
}
protected virtual async Task PublishOutgoingMessagesAsync(List<OutgoingEventInfo> waitingEvents)
{
foreach (var waitingEvent in waitingEvents)
@ -119,7 +124,7 @@ public class OutboxSender : IOutboxSender, ITransientDependency
);
await Outbox.DeleteAsync(waitingEvent.Id);
Logger.LogInformation($"Sent the event to the message broker with id = {waitingEvent.Id:N}");
}
}
@ -129,9 +134,9 @@ public class OutboxSender : IOutboxSender, ITransientDependency
await DistributedEventBus
.AsSupportsEventBoxes()
.PublishManyFromOutboxAsync(waitingEvents, OutboxConfig);
await Outbox.DeleteManyAsync(waitingEvents.Select(x => x.Id).ToArray());
Logger.LogInformation($"Sent {waitingEvents.Count} events to message broker");
}
}

11
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
@ -50,16 +51,24 @@ public class MongoDbContextEventInbox<TMongoDbContext> : IMongoDbContextEventInb
}
[UnitOfWork]
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IIncomingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default)
{
var dbContext = await DbContextProvider.GetDbContextAsync(cancellationToken);
Expression<Func<IncomingEventRecord, bool>>? transformedFilter = null;
if (filter != null)
{
transformedFilter = InboxOutboxFilterExpressionTransformer.Transform<IIncomingEventInfo, IncomingEventRecord>(filter)!;
}
var outgoingEventRecords = await dbContext
.IncomingEvents
.AsQueryable()
.Where(x => !x.Processed)
.WhereIf(transformedFilter != null, transformedFilter!)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.As<IMongoQueryable<IncomingEventRecord>>()
.ToListAsync(cancellationToken: cancellationToken);
return outgoingEventRecords

11
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
@ -40,14 +41,22 @@ public class MongoDbContextEventOutbox<TMongoDbContext> : IMongoDbContextEventOu
}
[UnitOfWork]
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, Expression<Func<IOutgoingEventInfo, bool>>? filter = null, CancellationToken cancellationToken = default)
{
var dbContext = (IHasEventOutbox)await MongoDbContextProvider.GetDbContextAsync(cancellationToken);
Expression<Func<OutgoingEventRecord, bool>>? transformedFilter = null;
if (filter != null)
{
transformedFilter = InboxOutboxFilterExpressionTransformer.Transform<IOutgoingEventInfo, OutgoingEventRecord>(filter)!;
}
var outgoingEventRecords = await dbContext
.OutgoingEvents.AsQueryable()
.WhereIf(transformedFilter != null, transformedFilter!)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.As<IMongoQueryable<OutgoingEventRecord>>()
.ToListAsync(cancellationToken: cancellationToken);
return outgoingEventRecords

Loading…
Cancel
Save