Browse Source

RabbitMQ integration.

pull/395/head
Halil ibrahim Kalkan 8 years ago
parent
commit
c9335c568a
  1. 2
      framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobExecuter.cs
  2. 3
      framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobOptions.cs
  3. 3
      framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/JobExecutionContext.cs
  4. 4
      framework/src/Volo.Abp.BackgroundJobs.HangFire/Volo/Abp/BackgroundJobs/Hangfire/HangfireJobExecutionAdapter.cs
  5. 17
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs
  6. 8
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobListener.cs
  7. 62
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobListener.cs
  8. 2
      framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/AbpBackgroundJobsModule.cs
  9. 6
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs
  10. 3
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqOptions.cs
  11. 29
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs
  12. 5
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs
  13. 9
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDictionary.cs
  14. 28
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueOptions.cs

2
framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobExecuter.cs

@ -46,8 +46,6 @@ namespace Volo.Abp.BackgroundJobs
}
catch (Exception ex)
{
context.Result = JobExecutionResult.Failed;
Logger.LogException(ex);
throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)

3
framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobOptions.cs

@ -7,10 +7,11 @@ namespace Volo.Abp.BackgroundJobs
{
public Dictionary<string, Type> JobTypes { get; }
//TODO: Implement for all providers! (Hangfire does not implement yet)
/// <summary>
/// Default: true.
/// </summary>
public bool IsEnabled { get; set; } = true;
public bool IsJobExecutionEnabled { get; set; } = true;
public BackgroundJobOptions()
{

3
framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/JobExecutionContext.cs

@ -8,13 +8,10 @@ namespace Volo.Abp.BackgroundJobs
public object JobArgs { get; }
public JobExecutionResult Result { get; set; }
public JobExecutionContext(Type jobType, object jobArgs)
{
JobType = jobType;
JobArgs = jobArgs;
Result = JobExecutionResult.Success;
}
}
}

4
framework/src/Volo.Abp.BackgroundJobs.HangFire/Volo/Abp/BackgroundJobs/Hangfire/HangfireJobExecutionAdapter.cs

@ -20,10 +20,6 @@ namespace Volo.Abp.BackgroundJobs.Hangfire
var context = new JobExecutionContext(jobType, args);
JobExecuter.Execute(context);
if (context.Result == JobExecutionResult.Failed)
{
throw new AbpException("Job failed");
}
}
}
}

17
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs

@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.Modularity;
using Volo.Abp.RabbitMQ;
@ -12,7 +13,23 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddSingleton(typeof(JobListener<>)); //TODO: Introduce and use interface!
context.Services.AddAssemblyOf<AbpBackgroundJobsRabbitMqModule>();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
//TODO: Move all to another class and stop listeners when needed!
var options = context.ServiceProvider.GetRequiredService<IOptions<BackgroundJobOptions>>().Value;
if (options.IsJobExecutionEnabled)
{
foreach (var jobType in options.JobTypes.Values)
{
var jobListener = (IJobListener)context.ServiceProvider.GetRequiredService(typeof(JobListener<>).MakeGenericType(jobType));
jobListener.Start();
}
}
}
}
}

8
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobListener.cs

@ -0,0 +1,8 @@
namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
public interface IJobListener
{
void Start();
void Stop();
}
}

62
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobListener.cs

@ -0,0 +1,62 @@
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Volo.Abp.RabbitMQ;
namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
public class JobListener<TJob> : IJobListener
{
protected string JobName { get; }
protected IBackgroundJobExecuter JobExecuter { get; }
protected IChannelPool ChannelPool { get; }
protected IRabbitMqSerializer Serializer { get; }
protected Type ArgsType { get; }
protected IChannelAccessor ChannelAccessor { get; private set; }
protected EventingBasicConsumer Consumer { get; private set; }
public JobListener(
IChannelPool channelPool,
IBackgroundJobExecuter jobExecuter,
IRabbitMqSerializer serializer)
{
ChannelPool = channelPool;
JobExecuter = jobExecuter;
Serializer = serializer;
ArgsType = BackgroundJobArgsHelper.GetJobArgsType(typeof(TJob));
JobName = BackgroundJobNameAttribute.GetName(ArgsType);
}
public void Start()
{
var queueName = "BackgroundJobs." + JobName; //TODO: Make prefix optional
ChannelAccessor = ChannelPool.Acquire(queueName);
//TODO: How to ensure that queue is created!
Consumer = new EventingBasicConsumer(ChannelAccessor.Channel);
Consumer.Received += MessageReceived;
//TODO: What BasicConsume returns?
ChannelAccessor.Channel.BasicConsume(
queue: queueName,
autoAck: false,
consumer: Consumer
);
}
private void MessageReceived(object sender, BasicDeliverEventArgs e)
{
var context = new JobExecutionContext(typeof(TJob), Serializer.Deserialize(e.Body, ArgsType));
JobExecuter.Execute(context);
//TODO: How to ACK on success or Reject on failure?
}
public void Stop()
{
ChannelAccessor.Dispose();
}
}
}

2
framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/AbpBackgroundJobsModule.cs

@ -23,7 +23,7 @@ namespace Volo.Abp.BackgroundJobs
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<BackgroundJobOptions>>().Value;
if (options.IsEnabled)
if (options.IsJobExecutionEnabled)
{
context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()

6
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs → framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs

@ -13,5 +13,11 @@ namespace Volo.Abp.RabbitMQ
{
context.Services.AddAssemblyOf<AbpRabbitMqModule>();
}
public override void OnApplicationShutdown(ApplicationShutdownContext context)
{
context.ServiceProvider.GetRequiredService<IConnectionPool>().Dispose();
//TODO: Dispose channel pool when it's implemented!
}
}
}

3
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqOptions.cs

@ -4,9 +4,12 @@
{
public RabbitMqConnections Connections { get; }
public QueueDictionary Queues { get; }
public AbpRabbitMqOptions()
{
Connections = new RabbitMqConnections();
Queues = new QueueDictionary();
}
}
}

29
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs

@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
@ -6,12 +7,14 @@ using Volo.Abp.DependencyInjection;
namespace Volo.Abp.RabbitMQ
{
public class ConnectionPool : IConnectionPool, ISingletonDependency
public class ConnectionPool : IConnectionPool, IDisposable, ISingletonDependency
{
protected AbpRabbitMqOptions Options { get; }
protected ConcurrentDictionary<string, IConnection> Connections { get; }
private bool _isDisposed;
public ConnectionPool(IOptions<AbpRabbitMqOptions> options)
{
Options = options.Value;
@ -28,5 +31,27 @@ namespace Volo.Abp.RabbitMQ
return connectionFactory.CreateConnection();
});
}
public void Dispose()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
foreach (var connection in Connections.Values)
{
try
{
connection.Dispose();
}
catch
{
}
}
}
}
}

5
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs

@ -1,8 +1,9 @@
using RabbitMQ.Client;
using System;
using RabbitMQ.Client;
namespace Volo.Abp.RabbitMQ
{
public interface IConnectionPool
public interface IConnectionPool : IDisposable
{
IConnection Get(string connectionName = null);
}

9
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDictionary.cs

@ -0,0 +1,9 @@
using System.Collections.Generic;
namespace Volo.Abp.RabbitMQ
{
public class QueueDictionary : Dictionary<string, QueueOptions>
{
}
}

28
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueOptions.cs

@ -0,0 +1,28 @@
using System.Collections.Generic;
using JetBrains.Annotations;
namespace Volo.Abp.RabbitMQ
{
public class QueueOptions
{
[NotNull]
public string Name { get; }
public bool Durable { get; set; }
public bool Exclusive { get; set; }
public bool AutoDelete { get; set; }
public IDictionary<string, object> Arguments { get; }
public QueueOptions([NotNull] string name, bool durable = true, bool exclusive = false, bool autoDelete = false)
{
Name = name;
Durable = durable;
Exclusive = exclusive;
AutoDelete = autoDelete;
Arguments = new Dictionary<string, object>();
}
}
}
Loading…
Cancel
Save