Browse Source

Add correlation ID support to RabbitMQ's JobQueue

Resolve #24753
pull/24755/head
maliming 1 week ago
parent
commit
d01fc48262
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 11
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs

11
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs

@ -12,6 +12,7 @@ using RabbitMQ.Client.Events;
using Volo.Abp.ExceptionHandling; using Volo.Abp.ExceptionHandling;
using Volo.Abp.RabbitMQ; using Volo.Abp.RabbitMQ;
using Volo.Abp.Threading; using Volo.Abp.Threading;
using Volo.Abp.Tracing;
namespace Volo.Abp.BackgroundJobs.RabbitMQ; namespace Volo.Abp.BackgroundJobs.RabbitMQ;
@ -33,6 +34,7 @@ public class JobQueue<TArgs> : IJobQueue<TArgs>
protected IBackgroundJobExecuter JobExecuter { get; } protected IBackgroundJobExecuter JobExecuter { get; }
protected IServiceScopeFactory ServiceScopeFactory { get; } protected IServiceScopeFactory ServiceScopeFactory { get; }
protected IExceptionNotifier ExceptionNotifier { get; } protected IExceptionNotifier ExceptionNotifier { get; }
protected ICorrelationIdProvider CorrelationIdProvider { get; }
protected SemaphoreSlim SyncObj = new SemaphoreSlim(1, 1); protected SemaphoreSlim SyncObj = new SemaphoreSlim(1, 1);
protected bool IsDiposed { get; private set; } protected bool IsDiposed { get; private set; }
@ -44,7 +46,8 @@ public class JobQueue<TArgs> : IJobQueue<TArgs>
IRabbitMqSerializer serializer, IRabbitMqSerializer serializer,
IBackgroundJobExecuter jobExecuter, IBackgroundJobExecuter jobExecuter,
IServiceScopeFactory serviceScopeFactory, IServiceScopeFactory serviceScopeFactory,
IExceptionNotifier exceptionNotifier) IExceptionNotifier exceptionNotifier,
ICorrelationIdProvider correlationIdProvider)
{ {
AbpBackgroundJobOptions = backgroundJobOptions.Value; AbpBackgroundJobOptions = backgroundJobOptions.Value;
AbpRabbitMqBackgroundJobOptions = rabbitMqAbpBackgroundJobOptions.Value; AbpRabbitMqBackgroundJobOptions = rabbitMqAbpBackgroundJobOptions.Value;
@ -52,6 +55,7 @@ public class JobQueue<TArgs> : IJobQueue<TArgs>
JobExecuter = jobExecuter; JobExecuter = jobExecuter;
ServiceScopeFactory = serviceScopeFactory; ServiceScopeFactory = serviceScopeFactory;
ExceptionNotifier = exceptionNotifier; ExceptionNotifier = exceptionNotifier;
CorrelationIdProvider = correlationIdProvider;
ChannelPool = channelPool; ChannelPool = channelPool;
JobConfiguration = AbpBackgroundJobOptions.GetJob(typeof(TArgs)); JobConfiguration = AbpBackgroundJobOptions.GetJob(typeof(TArgs));
@ -201,7 +205,10 @@ public class JobQueue<TArgs> : IJobQueue<TArgs>
try try
{ {
await JobExecuter.ExecuteAsync(context); using (CorrelationIdProvider.Change(ea.BasicProperties.CorrelationId))
{
await JobExecuter.ExecuteAsync(context);
}
await ChannelAccessor!.Channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false); await ChannelAccessor!.Channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
} }
catch (BackgroundJobExecutionException) catch (BackgroundJobExecutionException)

Loading…
Cancel
Save