Browse Source

Merge pull request #9725 from abpframework/liangshiwei/rabbitmq

Implemented delayed jobs for RabbitMQ integration
pull/10092/head
Halil İbrahim Kalkan 5 years ago
committed by GitHub
parent
commit
21d81b04cc
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpRabbitMqBackgroundJobOptions.cs
  2. 22
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs
  3. 35
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueConfiguration.cs

6
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpRabbitMqBackgroundJobOptions.cs

@ -15,10 +15,16 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
/// </summary>
public string DefaultQueueNamePrefix { get; set; }
/// <summary>
/// Default value: "AbpBackgroundJobsDelayed."
/// </summary>
public string DefaultDelayedQueueNamePrefix { get; set;}
public AbpRabbitMqBackgroundJobOptions()
{
JobQueues = new Dictionary<Type, JobQueueConfiguration>();
DefaultQueueNamePrefix = "AbpBackgroundJobs.";
DefaultDelayedQueueNamePrefix = "AbpBackgroundJobsDelayed.";
}
}
}

22
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
@ -64,7 +65,8 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
return AbpRabbitMqBackgroundJobOptions.JobQueues.GetOrDefault(typeof(TArgs)) ??
new JobQueueConfiguration(
typeof(TArgs),
AbpRabbitMqBackgroundJobOptions.DefaultQueueNamePrefix + JobConfiguration.JobName
AbpRabbitMqBackgroundJobOptions.DefaultQueueNamePrefix + JobConfiguration.JobName,
AbpRabbitMqBackgroundJobOptions.DefaultDelayedQueueNamePrefix + JobConfiguration.JobName
);
}
@ -133,6 +135,9 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
var result = QueueConfiguration.Declare(ChannelAccessor.Channel);
Logger.LogDebug($"RabbitMQ Queue '{QueueConfiguration.QueueName}' has {result.MessageCount} messages and {result.ConsumerCount} consumers.");
// Declare delayed queue
QueueConfiguration.DeclareDelayed(ChannelAccessor.Channel);
if (AbpBackgroundJobOptions.IsJobExecutionEnabled)
{
Consumer = new AsyncEventingBasicConsumer(ChannelAccessor.Channel);
@ -154,12 +159,21 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
BackgroundJobPriority priority = BackgroundJobPriority.Normal,
TimeSpan? delay = null)
{
//TODO: How to handle priority & delay?
//TODO: How to handle priority
var routingKey = QueueConfiguration.QueueName;
var basicProperties = CreateBasicPropertiesToPublish();
if (delay.HasValue)
{
routingKey = QueueConfiguration.DelayedQueueName;
basicProperties.Expiration = delay.Value.TotalMilliseconds.ToString();
}
ChannelAccessor.Channel.BasicPublish(
exchange: "",
routingKey: QueueConfiguration.QueueName,
basicProperties: CreateBasicPropertiesToPublish(),
routingKey: routingKey,
basicProperties: basicProperties,
body: Serializer.Serialize(args)
);

35
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueConfiguration.cs

@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using RabbitMQ.Client;
using Volo.Abp.RabbitMQ;
namespace Volo.Abp.BackgroundJobs.RabbitMQ
@ -9,21 +11,42 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
public string ConnectionName { get; set; }
public string DelayedQueueName { get; set; }
public JobQueueConfiguration(
Type jobArgsType,
string queueName,
Type jobArgsType,
string queueName,
string delayedQueueName,
string connectionName = null,
bool durable = true,
bool exclusive = false,
bool autoDelete = false)
: base(
queueName,
durable,
exclusive,
queueName,
durable,
exclusive,
autoDelete)
{
JobArgsType = jobArgsType;
ConnectionName = connectionName;
DelayedQueueName = delayedQueueName;
}
public virtual QueueDeclareOk DeclareDelayed(IModel channel)
{
var delayedArguments = new Dictionary<string, object>(Arguments)
{
["x-dead-letter-routing-key"] = QueueName,
["x-dead-letter-exchange"] = string.Empty
};
return channel.QueueDeclare(
queue: DelayedQueueName,
durable: Durable,
exclusive: Exclusive,
autoDelete: AutoDelete,
arguments: delayedArguments
);
}
}
}
}

Loading…
Cancel
Save