diff --git a/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobExecuter.cs b/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobExecuter.cs index a8b147e349..fb66702186 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobExecuter.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobExecuter.cs @@ -46,8 +46,6 @@ namespace Volo.Abp.BackgroundJobs } catch (Exception ex) { - context.Result = JobExecutionResult.Failed; - Logger.LogException(ex); throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex) diff --git a/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobOptions.cs b/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobOptions.cs index b36fe46d13..40b7ded802 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobOptions.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/BackgroundJobOptions.cs @@ -7,10 +7,11 @@ namespace Volo.Abp.BackgroundJobs { public Dictionary JobTypes { get; } + //TODO: Implement for all providers! (Hangfire does not implement yet) /// /// Default: true. /// - public bool IsEnabled { get; set; } = true; + public bool IsJobExecutionEnabled { get; set; } = true; public BackgroundJobOptions() { diff --git a/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/JobExecutionContext.cs b/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/JobExecutionContext.cs index 90e8503641..7cdeed85bf 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/JobExecutionContext.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.Abstractions/Volo/Abp/BackgroundJobs/JobExecutionContext.cs @@ -8,13 +8,10 @@ namespace Volo.Abp.BackgroundJobs public object JobArgs { get; } - public JobExecutionResult Result { get; set; } - public JobExecutionContext(Type jobType, object jobArgs) { JobType = jobType; JobArgs = jobArgs; - Result = JobExecutionResult.Success; } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.BackgroundJobs.HangFire/Volo/Abp/BackgroundJobs/Hangfire/HangfireJobExecutionAdapter.cs b/framework/src/Volo.Abp.BackgroundJobs.HangFire/Volo/Abp/BackgroundJobs/Hangfire/HangfireJobExecutionAdapter.cs index fe94ae1fec..8d89cde379 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.HangFire/Volo/Abp/BackgroundJobs/Hangfire/HangfireJobExecutionAdapter.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.HangFire/Volo/Abp/BackgroundJobs/Hangfire/HangfireJobExecutionAdapter.cs @@ -20,10 +20,6 @@ namespace Volo.Abp.BackgroundJobs.Hangfire var context = new JobExecutionContext(jobType, args); JobExecuter.Execute(context); - if (context.Result == JobExecutionResult.Failed) - { - throw new AbpException("Job failed"); - } } } } diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs index f57704f897..590afbef1b 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs @@ -1,4 +1,5 @@ using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; using Volo.Abp.Modularity; using Volo.Abp.RabbitMQ; @@ -12,7 +13,23 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ { public override void ConfigureServices(ServiceConfigurationContext context) { + context.Services.AddSingleton(typeof(JobListener<>)); //TODO: Introduce and use interface! + context.Services.AddAssemblyOf(); } + + public override void OnApplicationInitialization(ApplicationInitializationContext context) + { + //TODO: Move all to another class and stop listeners when needed! + var options = context.ServiceProvider.GetRequiredService>().Value; + if (options.IsJobExecutionEnabled) + { + foreach (var jobType in options.JobTypes.Values) + { + var jobListener = (IJobListener)context.ServiceProvider.GetRequiredService(typeof(JobListener<>).MakeGenericType(jobType)); + jobListener.Start(); + } + } + } } } diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobListener.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobListener.cs new file mode 100644 index 0000000000..1bfb96fec6 --- /dev/null +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobListener.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.BackgroundJobs.RabbitMQ +{ + public interface IJobListener + { + void Start(); + void Stop(); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobListener.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobListener.cs new file mode 100644 index 0000000000..19252a03ad --- /dev/null +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobListener.cs @@ -0,0 +1,62 @@ +using System; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Volo.Abp.RabbitMQ; + +namespace Volo.Abp.BackgroundJobs.RabbitMQ +{ + public class JobListener : IJobListener + { + protected string JobName { get; } + protected IBackgroundJobExecuter JobExecuter { get; } + protected IChannelPool ChannelPool { get; } + protected IRabbitMqSerializer Serializer { get; } + protected Type ArgsType { get; } + + protected IChannelAccessor ChannelAccessor { get; private set; } + protected EventingBasicConsumer Consumer { get; private set; } + + public JobListener( + IChannelPool channelPool, + IBackgroundJobExecuter jobExecuter, + IRabbitMqSerializer serializer) + { + ChannelPool = channelPool; + JobExecuter = jobExecuter; + Serializer = serializer; + ArgsType = BackgroundJobArgsHelper.GetJobArgsType(typeof(TJob)); + JobName = BackgroundJobNameAttribute.GetName(ArgsType); + } + + public void Start() + { + var queueName = "BackgroundJobs." + JobName; //TODO: Make prefix optional + + ChannelAccessor = ChannelPool.Acquire(queueName); + + //TODO: How to ensure that queue is created! + + Consumer = new EventingBasicConsumer(ChannelAccessor.Channel); + Consumer.Received += MessageReceived; + + //TODO: What BasicConsume returns? + ChannelAccessor.Channel.BasicConsume( + queue: queueName, + autoAck: false, + consumer: Consumer + ); + } + + private void MessageReceived(object sender, BasicDeliverEventArgs e) + { + var context = new JobExecutionContext(typeof(TJob), Serializer.Deserialize(e.Body, ArgsType)); + JobExecuter.Execute(context); + //TODO: How to ACK on success or Reject on failure? + } + + public void Stop() + { + ChannelAccessor.Dispose(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/AbpBackgroundJobsModule.cs b/framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/AbpBackgroundJobsModule.cs index 01d8ddba05..ea405068bb 100644 --- a/framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/AbpBackgroundJobsModule.cs +++ b/framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/AbpBackgroundJobsModule.cs @@ -23,7 +23,7 @@ namespace Volo.Abp.BackgroundJobs public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService>().Value; - if (options.IsEnabled) + if (options.IsJobExecutionEnabled) { context.ServiceProvider .GetRequiredService() diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs similarity index 61% rename from framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs rename to framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs index 01e2e52d06..cf3bf40455 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqModule.cs @@ -13,5 +13,11 @@ namespace Volo.Abp.RabbitMQ { context.Services.AddAssemblyOf(); } + + public override void OnApplicationShutdown(ApplicationShutdownContext context) + { + context.ServiceProvider.GetRequiredService().Dispose(); + //TODO: Dispose channel pool when it's implemented! + } } } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqOptions.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqOptions.cs index a971b85d5e..590aab97b7 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqOptions.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/AbpRabbitMqOptions.cs @@ -4,9 +4,12 @@ { public RabbitMqConnections Connections { get; } + public QueueDictionary Queues { get; } + public AbpRabbitMqOptions() { Connections = new RabbitMqConnections(); + Queues = new QueueDictionary(); } } } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs index a173564d52..e48218a7cf 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs @@ -1,4 +1,5 @@ -using System.Collections.Concurrent; +using System; +using System.Collections.Concurrent; using System.Collections.Generic; using Microsoft.Extensions.Options; using RabbitMQ.Client; @@ -6,12 +7,14 @@ using Volo.Abp.DependencyInjection; namespace Volo.Abp.RabbitMQ { - public class ConnectionPool : IConnectionPool, ISingletonDependency + public class ConnectionPool : IConnectionPool, IDisposable, ISingletonDependency { protected AbpRabbitMqOptions Options { get; } protected ConcurrentDictionary Connections { get; } + private bool _isDisposed; + public ConnectionPool(IOptions options) { Options = options.Value; @@ -28,5 +31,27 @@ namespace Volo.Abp.RabbitMQ return connectionFactory.CreateConnection(); }); } + + public void Dispose() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + + foreach (var connection in Connections.Values) + { + try + { + connection.Dispose(); + } + catch + { + + } + } + } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs index 8e0b06bebd..35d88b7113 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/IConnectionPool.cs @@ -1,8 +1,9 @@ -using RabbitMQ.Client; +using System; +using RabbitMQ.Client; namespace Volo.Abp.RabbitMQ { - public interface IConnectionPool + public interface IConnectionPool : IDisposable { IConnection Get(string connectionName = null); } diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDictionary.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDictionary.cs new file mode 100644 index 0000000000..9d0015d51c --- /dev/null +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueDictionary.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace Volo.Abp.RabbitMQ +{ + public class QueueDictionary : Dictionary + { + + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueOptions.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueOptions.cs new file mode 100644 index 0000000000..55905c9e3b --- /dev/null +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/QueueOptions.cs @@ -0,0 +1,28 @@ +using System.Collections.Generic; +using JetBrains.Annotations; + +namespace Volo.Abp.RabbitMQ +{ + public class QueueOptions + { + [NotNull] + public string Name { get; } + + public bool Durable { get; set; } + + public bool Exclusive { get; set; } + + public bool AutoDelete { get; set; } + + public IDictionary Arguments { get; } + + public QueueOptions([NotNull] string name, bool durable = true, bool exclusive = false, bool autoDelete = false) + { + Name = name; + Durable = durable; + Exclusive = exclusive; + AutoDelete = autoDelete; + Arguments = new Dictionary(); + } + } +} \ No newline at end of file