diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs index 863219e7ef..5f79068ae2 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs @@ -1,13 +1,14 @@ using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; using Volo.Abp.Modularity; using Volo.Abp.RabbitMQ; +using Volo.Abp.Threading; namespace Volo.Abp.BackgroundJobs.RabbitMQ { [DependsOn( typeof(AbpBackgroundJobsAbstractionsModule), - typeof(AbpRabbitMqModule) + typeof(AbpRabbitMqModule), + typeof(AbpThreadingModule) )] public class AbpBackgroundJobsRabbitMqModule : AbpModule { @@ -20,16 +21,9 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ public override void OnApplicationInitialization(ApplicationInitializationContext context) { - //TODO: Move all to another class and stop listeners when needed! - var options = context.ServiceProvider.GetRequiredService>().Value; - if (options.IsJobExecutionEnabled) - { - foreach (var jobType in options.JobTypes.Values) - { - var jobListener = (IJobListener)context.ServiceProvider.GetRequiredService(typeof(JobListener<>).MakeGenericType(jobType)); - jobListener.Start(); - } - } + context.ServiceProvider + .GetRequiredService() + .Start(); } } } diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueue.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueue.cs index 40678e494c..ca4b393eba 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueue.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueue.cs @@ -1,12 +1,15 @@ using System; using System.Threading.Tasks; +using Volo.Abp.Threading; namespace Volo.Abp.BackgroundJobs.RabbitMQ { - public interface IJobQueue : IDisposable + public interface IJobQueue : IRunnable, IDisposable { - Task Enqueue(TArgs args); - - Task StartAsync(); + Task EnqueueAsync( + TArgs args, + BackgroundJobPriority priority = BackgroundJobPriority.Normal, + TimeSpan? delay = null + ); } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueueManager.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueueManager.cs new file mode 100644 index 0000000000..da560d1ed2 --- /dev/null +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueueManager.cs @@ -0,0 +1,9 @@ +using Volo.Abp.Threading; + +namespace Volo.Abp.BackgroundJobs.RabbitMQ +{ + public interface IJobQueueManager : IRunnable + { + IJobQueue Get(); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs index 2744ec7edb..b05fbca525 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs @@ -1,6 +1,9 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Nito.AsyncEx; using RabbitMQ.Client; @@ -9,6 +12,8 @@ using Volo.Abp.RabbitMQ; namespace Volo.Abp.BackgroundJobs.RabbitMQ { + //TODO: Needs refactoring + public class JobQueue : IJobQueue { protected Type JobType { get; } @@ -17,6 +22,8 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ protected IChannelAccessor ChannelAccessor { get; private set; } protected EventingBasicConsumer Consumer { get; private set; } + + public ILogger> Logger { get; set; } protected IChannelPool ChannelPool { get; } protected AbpRabbitMqOptions RabbitMqOptions { get; } @@ -43,9 +50,14 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ JobName = BackgroundJobNameAttribute.GetName(); JobType = BackgroundJobOptions.GetJobType(JobName); QueueName = "BackgroundJobs." + JobName; //TODO: Make prefix optional + + Logger = NullLogger>.Instance; } - public virtual async Task Enqueue(TArgs args) + public virtual async Task EnqueueAsync( + TArgs args, + BackgroundJobPriority priority = BackgroundJobPriority.Normal, + TimeSpan? delay = null) { CheckDisposed(); @@ -53,20 +65,31 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ { await EnsureInitializedAsync(); - await PublishAsync(args); + await PublishAsync(args, priority, delay); return null; } } - public Task StartAsync() + public async Task StartAsync(CancellationToken cancellationToken = default) { + CheckDisposed(); + if (!BackgroundJobOptions.IsJobExecutionEnabled) { - return Task.CompletedTask; + return; + } + + using (await SyncObj.LockAsync()) + { + await EnsureInitializedAsync(); } + } - return EnsureInitializedAsync(); + public Task StopAsync(CancellationToken cancellationToken = default) + { + Dispose(); + return Task.CompletedTask; } public virtual void Dispose() @@ -81,7 +104,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ ChannelAccessor?.Dispose(); } - public virtual Task EnsureInitializedAsync() + protected virtual Task EnsureInitializedAsync() { if (ChannelAccessor != null) { @@ -93,7 +116,8 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ var queueOptions = RabbitMqOptions.Queues.GetOrDefault(QueueName) ?? new QueueOptions(QueueName); - queueOptions.Declare(ChannelAccessor.Channel); + var result = queueOptions.Declare(ChannelAccessor.Channel); + Logger.LogDebug($"RabbitMQ Queue '{QueueName}' has {result.MessageCount} messages and {result.ConsumerCount} consumers."); if (BackgroundJobOptions.IsJobExecutionEnabled) { @@ -111,8 +135,13 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ return Task.CompletedTask; } - protected virtual Task PublishAsync(TArgs args) + protected virtual Task PublishAsync( + TArgs args, + BackgroundJobPriority priority = BackgroundJobPriority.Normal, + TimeSpan? delay = null) { + //TODO: How to handle priority & delay? + ChannelAccessor.Channel.BasicPublish( exchange: "", routingKey: QueueName, @@ -137,9 +166,21 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ Serializer.Deserialize(ea.Body, typeof(TArgs)) ); - JobExecuter.Execute(context); - - //TODO: How to ACK on success or Reject on failure? + try + { + JobExecuter.Execute(context); + ChannelAccessor.Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); + } + catch (BackgroundJobExecutionException) + { + //TODO: Reject like that? + ChannelAccessor.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true); + } + catch (Exception) + { + //TODO: Reject like that? + ChannelAccessor.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); + } } private void CheckDisposed() diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueManager.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueManager.cs new file mode 100644 index 0000000000..02e27ab7a0 --- /dev/null +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueManager.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; +using Volo.Abp.Threading; + +namespace Volo.Abp.BackgroundJobs.RabbitMQ +{ + public class JobQueueManager : IJobQueueManager, ISingletonDependency + { + protected ConcurrentDictionary JobQueues { get; } + + protected IServiceProvider ServiceProvider { get; } + protected BackgroundJobOptions Options { get; } + + public JobQueueManager( + IOptions options, + IServiceProvider serviceProvider) + { + ServiceProvider = serviceProvider; + Options = options.Value; + JobQueues = new ConcurrentDictionary(); + } + + public async Task StartAsync(CancellationToken cancellationToken = default) + { + if (!Options.IsJobExecutionEnabled) + { + return; + } + + foreach (var item in Options.JobTypes) + { + var jobName = item.Key; + var jobType = item.Value; + var argsType = BackgroundJobArgsHelper.GetJobArgsType(jobType); + + var jobQueue = (IRunnable)ServiceProvider.GetRequiredService(typeof(IJobQueue<>).MakeGenericType(argsType)); + await jobQueue.StartAsync(cancellationToken); + JobQueues[jobName] = jobQueue; + } + } + + public async Task StopAsync(CancellationToken cancellationToken = default) + { + foreach (var jobQueue in JobQueues.Values) + { + await jobQueue.StopAsync(cancellationToken); + } + + JobQueues.Clear(); + } + + public IJobQueue Get() + { + var jobName = BackgroundJobNameAttribute.GetName(typeof(TArgs)); + + if (!Options.JobTypes.ContainsKey(jobName)) + { + throw new AbpException("No job registered"); + } + + return (IJobQueue)JobQueues.GetOrAdd(jobName, _ => + { + var jobQueue = (IRunnable)ServiceProvider.GetRequiredService(typeof(IJobQueue<>).MakeGenericType(typeof(TArgs))); + jobQueue.Start(); + return jobQueue; + }); + } + } +} diff --git a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/RabbitMqBackgroundJobManager.cs b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/RabbitMqBackgroundJobManager.cs index 08ea5aaa7e..143f71ba2f 100644 --- a/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/RabbitMqBackgroundJobManager.cs +++ b/framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/RabbitMqBackgroundJobManager.cs @@ -8,13 +8,11 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ { public class RabbitMqBackgroundJobManager : IBackgroundJobManager, ITransientDependency { - protected IChannelPool ChannelPool { get; } - protected IRabbitMqSerializer Serializer { get; } + private readonly IJobQueueManager _jobQueueManager; - public RabbitMqBackgroundJobManager(IChannelPool channelPool, IRabbitMqSerializer serializer) + public RabbitMqBackgroundJobManager(IJobQueueManager jobQueueManager) { - Serializer = serializer; - ChannelPool = channelPool; + _jobQueueManager = jobQueueManager; } public Task EnqueueAsync( @@ -22,7 +20,8 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { - + var jobQueue = _jobQueueManager.Get(); + return jobQueue.EnqueueAsync(args, priority, delay); } } } diff --git a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AbpBackgroundWorkersModule.cs b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AbpBackgroundWorkersModule.cs index 9594eedd85..8d4f94e90c 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AbpBackgroundWorkersModule.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AbpBackgroundWorkersModule.cs @@ -33,7 +33,7 @@ namespace Volo.Abp.BackgroundWorkers { context.ServiceProvider .GetRequiredService() - .StopAndWaitToStop(); + .Stop(); } } } diff --git a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerBase.cs b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerBase.cs index b2ae62fc6e..1ce71ca3f9 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerBase.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerBase.cs @@ -1,13 +1,14 @@ +using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; -using Volo.Abp.Threading; namespace Volo.Abp.BackgroundWorkers { /// /// Base class that can be used to implement . /// - public abstract class BackgroundWorkerBase : RunnableBase, IBackgroundWorker + public abstract class BackgroundWorkerBase : IBackgroundWorker { //TODO: Add UOW, Localization and other useful properties..? @@ -17,28 +18,19 @@ namespace Volo.Abp.BackgroundWorkers { Logger = NullLogger.Instance; } - - public override void Start() + + public virtual Task StartAsync(CancellationToken cancellationToken = default) { - Logger.LogDebug("Starting background worker: " + ToString()); - base.Start(); Logger.LogDebug("Started background worker: " + ToString()); + return Task.CompletedTask; } - public override void Stop() + public virtual Task StopAsync(CancellationToken cancellationToken = default) { - Logger.LogDebug("Stopping background worker: " + ToString()); - base.Stop(); Logger.LogDebug("Stopped background worker: " + ToString()); + return Task.CompletedTask; } - public override void WaitToStop() - { - Logger.LogDebug("Waiting background worker to completely stop: " + ToString()); - base.WaitToStop(); - Logger.LogDebug("Background worker is completely stopped: " + ToString()); - } - public override string ToString() { return GetType().FullName; diff --git a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs index 474db34ab7..16c9f08d10 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs @@ -1,5 +1,7 @@ using System; using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; using Volo.Abp.DependencyInjection; using Volo.Abp.Threading; @@ -8,8 +10,12 @@ namespace Volo.Abp.BackgroundWorkers /// /// Implements . /// - public class BackgroundWorkerManager : RunnableBase, IBackgroundWorkerManager, ISingletonDependency, IDisposable + public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable { + protected bool IsRunning { get; private set; } + + private bool _isDisposed; + private readonly List _backgroundWorkers; /// @@ -20,27 +26,6 @@ namespace Volo.Abp.BackgroundWorkers _backgroundWorkers = new List(); } - public override void Start() - { - base.Start(); - - _backgroundWorkers.ForEach(worker => worker.Start()); - } - - public override void Stop() - { - _backgroundWorkers.ForEach(worker => worker.Stop()); - - base.Stop(); - } - - public override void WaitToStop() - { - _backgroundWorkers.ForEach(worker => worker.WaitToStop()); - - base.WaitToStop(); - } - public void Add(IBackgroundWorker worker) { _backgroundWorkers.Add(worker); @@ -51,8 +36,6 @@ namespace Volo.Abp.BackgroundWorkers } } - private bool _isDisposed; - public void Dispose() { if (_isDisposed) @@ -64,5 +47,25 @@ namespace Volo.Abp.BackgroundWorkers //TODO: ??? } + + public async Task StartAsync(CancellationToken cancellationToken = default) + { + IsRunning = true; + + foreach (var worker in _backgroundWorkers) + { + await worker.StartAsync(cancellationToken); + } + } + + public async Task StopAsync(CancellationToken cancellationToken = default) + { + IsRunning = false; + + foreach (var worker in _backgroundWorkers) + { + await worker.StopAsync(cancellationToken); + } + } } } diff --git a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs index 38d8448c72..49773826a7 100644 --- a/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs +++ b/framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs @@ -1,4 +1,6 @@ using System; +using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Volo.Abp.Threading; @@ -21,27 +23,18 @@ namespace Volo.Abp.BackgroundWorkers Timer.Elapsed += Timer_Elapsed; } - public override void Start() + public override async Task StartAsync(CancellationToken cancellationToken = default) { - base.Start(); - Timer.Start(); + await base.StartAsync(cancellationToken); + await Timer.StartAsync(cancellationToken); } - public override void Stop() + public override async Task StopAsync(CancellationToken cancellationToken = default) { - Timer.Stop(); - base.Stop(); + await Timer.StopAsync(cancellationToken); + await base.StopAsync(cancellationToken); } - - public override void WaitToStop() - { - Timer.WaitToStop(); - base.WaitToStop(); - } - - /// - /// Handles the Elapsed event of the Timer. - /// + private void Timer_Elapsed(object sender, System.EventArgs e) { try diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs index e48218a7cf..e354ca90ec 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs @@ -23,6 +23,8 @@ namespace Volo.Abp.RabbitMQ public virtual IConnection Get(string connectionName = null) { + connectionName = connectionName ?? RabbitMqConnections.DefaultConnectionName; + return Connections.GetOrAdd(connectionName, () => { var connectionFactory = Options.Connections.GetOrDefault(connectionName) diff --git a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpTimer.cs b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpTimer.cs index c61dd645f4..4733ce01e2 100644 --- a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpTimer.cs +++ b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpTimer.cs @@ -1,5 +1,8 @@ using System; using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Volo.Abp.DependencyInjection; namespace Volo.Abp.Threading @@ -7,7 +10,7 @@ namespace Volo.Abp.Threading /// /// A roboust timer implementation that ensures no overlapping occurs. It waits exactly specified between ticks. /// - public class AbpTimer : RunnableBase, ITransientDependency + public class AbpTimer : IRunnable, ITransientDependency { /// /// This event is raised periodically according to Period of Timer. @@ -25,74 +28,49 @@ namespace Volo.Abp.Threading /// public bool RunOnStart { get; set; } - /// - /// This timer is used to perfom the task at spesified intervals. - /// - private readonly Timer _taskTimer; + public ILogger Logger { get; set; } - /// - /// Indicates that whether timer is running or stopped. - /// - private volatile bool _running; - - /// - /// Indicates that whether performing the task or _taskTimer is in sleep mode. - /// This field is used to wait executing tasks when stopping Timer. - /// + private readonly Timer _taskTimer; private volatile bool _performingTasks; + private volatile bool _isRunning; - /// - /// Creates a new Timer. - /// public AbpTimer() { + Logger = NullLogger.Instance; + _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite); } - /// - /// Starts the timer. - /// - public override void Start() + public Task StartAsync(CancellationToken cancellationToken = default) { if (Period <= 0) { throw new AbpException("Period should be set before starting the timer!"); } - base.Start(); - - _running = true; - _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite); - } - - /// - /// Stops the timer. - /// - public override void Stop() - { lock (_taskTimer) { - _running = false; - _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); + _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite); + _isRunning = true; } - base.Stop(); + return Task.CompletedTask; } - /// - /// Waits the service to stop. - /// - public override void WaitToStop() + public Task StopAsync(CancellationToken cancellationToken = default) { lock (_taskTimer) { + _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); while (_performingTasks) { Monitor.Wait(_taskTimer); } + + _isRunning = false; } - base.WaitToStop(); + return Task.CompletedTask; } /// @@ -103,7 +81,7 @@ namespace Volo.Abp.Threading { lock (_taskTimer) { - if (!_running || _performingTasks) + if (!_isRunning || _performingTasks) { return; } @@ -114,10 +92,7 @@ namespace Volo.Abp.Threading try { - if (Elapsed != null) - { - Elapsed(this, new EventArgs()); - } + Elapsed.InvokeSafely(this, new EventArgs()); } catch { @@ -128,7 +103,7 @@ namespace Volo.Abp.Threading lock (_taskTimer) { _performingTasks = false; - if (_running) + if (_isRunning) { _taskTimer.Change(Period, Timeout.Infinite); } diff --git a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/IRunnable.cs b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/IRunnable.cs index 6aa5b2d1e4..38b838af86 100644 --- a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/IRunnable.cs +++ b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/IRunnable.cs @@ -1,4 +1,7 @@ -namespace Volo.Abp.Threading +using System.Threading; +using System.Threading.Tasks; + +namespace Volo.Abp.Threading { /// /// Interface to start/stop self threaded services. @@ -8,18 +11,11 @@ /// /// Starts the service. /// - void Start(); - - /// - /// Sends stop command to the service. - /// Service may return immediately and stop asynchronously. - /// A client should then call method to ensure it's stopped. - /// - void Stop(); + Task StartAsync(CancellationToken cancellationToken = default); /// - /// Waits the service to stop. + /// Stops the service. /// - void WaitToStop(); + Task StopAsync(CancellationToken cancellationToken = default); } } diff --git a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableBase.cs b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableBase.cs deleted file mode 100644 index bc72baeddd..0000000000 --- a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableBase.cs +++ /dev/null @@ -1,29 +0,0 @@ -namespace Volo.Abp.Threading -{ - /// - /// Base implementation of . - /// - public abstract class RunnableBase : IRunnable - { - /// - /// A boolean value to check if this is running. - /// - public bool IsRunning => _isRunning; - private volatile bool _isRunning; - - public virtual void Start() - { - _isRunning = true; - } - - public virtual void Stop() - { - _isRunning = false; - } - - public virtual void WaitToStop() - { - - } - } -} \ No newline at end of file diff --git a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableExtensions.cs b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableExtensions.cs index 91e423b15b..0fec7986fb 100644 --- a/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableExtensions.cs +++ b/framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableExtensions.cs @@ -1,17 +1,21 @@ +using JetBrains.Annotations; + namespace Volo.Abp.Threading { - /// - /// Some extension methods for . - /// public static class RunnableExtensions { - /// - /// Calls and then . - /// - public static void StopAndWaitToStop(this IRunnable runnable) + public static void Start([NotNull] this IRunnable runnable) { - runnable.Stop(); - runnable.WaitToStop(); + Check.NotNull(runnable, nameof(runnable)); + + AsyncHelper.RunSync(() => runnable.StartAsync()); + } + + public static void Stop([NotNull] this IRunnable runnable) + { + Check.NotNull(runnable, nameof(runnable)); + + AsyncHelper.RunSync(() => runnable.StopAsync()); } } } \ No newline at end of file diff --git a/modules/background-jobs/Volo.Abp.BackgroundJobs.sln b/modules/background-jobs/Volo.Abp.BackgroundJobs.sln index 8c6f5a0cad..09bfc90457 100644 --- a/modules/background-jobs/Volo.Abp.BackgroundJobs.sln +++ b/modules/background-jobs/Volo.Abp.BackgroundJobs.sln @@ -31,6 +31,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BackgroundJobs.Dem EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BackgroundJobs.DemoApp.HangFire", "app\Volo.Abp.BackgroundJobs.DemoApp.HangFire\Volo.Abp.BackgroundJobs.DemoApp.HangFire.csproj", "{2060AC85-2598-4342-A87C-A684A2C71A37}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BackgroundJobs.DemoApp.RabbitMq", "app\Volo.Abp.BackgroundJobs.DemoApp.RabbitMq\Volo.Abp.BackgroundJobs.DemoApp.RabbitMq.csproj", "{7C8D03F7-165E-478C-A6D6-DC8229EEABB9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -81,6 +83,10 @@ Global {2060AC85-2598-4342-A87C-A684A2C71A37}.Debug|Any CPU.Build.0 = Debug|Any CPU {2060AC85-2598-4342-A87C-A684A2C71A37}.Release|Any CPU.ActiveCfg = Release|Any CPU {2060AC85-2598-4342-A87C-A684A2C71A37}.Release|Any CPU.Build.0 = Release|Any CPU + {7C8D03F7-165E-478C-A6D6-DC8229EEABB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7C8D03F7-165E-478C-A6D6-DC8229EEABB9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7C8D03F7-165E-478C-A6D6-DC8229EEABB9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7C8D03F7-165E-478C-A6D6-DC8229EEABB9}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -97,6 +103,7 @@ Global {9A871D66-BE8D-440C-BF79-F778F8A50BF3} = {E400416D-2895-4512-9D17-90681EEC7E0A} {DE6914FF-F60A-461A-8498-461198E917BE} = {E400416D-2895-4512-9D17-90681EEC7E0A} {2060AC85-2598-4342-A87C-A684A2C71A37} = {E400416D-2895-4512-9D17-90681EEC7E0A} + {7C8D03F7-165E-478C-A6D6-DC8229EEABB9} = {E400416D-2895-4512-9D17-90681EEC7E0A} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} diff --git a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/DemoAppHangfireModule.cs b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/DemoAppHangfireModule.cs new file mode 100644 index 0000000000..cb6432fdcc --- /dev/null +++ b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/DemoAppHangfireModule.cs @@ -0,0 +1,30 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Volo.Abp.Autofac; +using Volo.Abp.BackgroundJobs.DemoApp.Shared; +using Volo.Abp.BackgroundJobs.RabbitMQ; +using Volo.Abp.Modularity; + +namespace Volo.Abp.BackgroundJobs.DemoApp.RabbitMq +{ + [DependsOn( + typeof(DemoAppSharedModule), + typeof(AbpAutofacModule), + typeof(AbpBackgroundJobsRabbitMqModule) + )] + public class DemoAppHangfireModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + context.Services.AddAssemblyOf(); + } + + public override void OnApplicationInitialization(ApplicationInitializationContext context) + { + context + .ServiceProvider + .GetRequiredService() + .AddConsole(LogLevel.Debug); + } + } +} diff --git a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/Program.cs b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/Program.cs new file mode 100644 index 0000000000..bf06975175 --- /dev/null +++ b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/Program.cs @@ -0,0 +1,24 @@ +using System; + +namespace Volo.Abp.BackgroundJobs.DemoApp.RabbitMq +{ + class Program + { + static void Main(string[] args) + { + using (var application = AbpApplicationFactory.Create(options => + { + options.UseAutofac(); + })) + { + application.Initialize(); + + Console.WriteLine("Started: " + typeof(Program).Namespace); + Console.WriteLine("Press ENTER to stop the application..!"); + Console.ReadLine(); + + application.Shutdown(); + } + } + } +} diff --git a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq.csproj b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq.csproj new file mode 100644 index 0000000000..d0418a48de --- /dev/null +++ b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq.csproj @@ -0,0 +1,14 @@ + + + + Exe + netcoreapp2.1 + + + + + + + + + diff --git a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJob.cs b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJob.cs index 8d1d989a7a..f2d2854f1f 100644 --- a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJob.cs +++ b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJob.cs @@ -16,7 +16,7 @@ namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine(); - Console.WriteLine($"############### WriteToConsoleGreenJob: {args.Value} ###############"); + Console.WriteLine($"############### WriteToConsoleGreenJob: {args.Value} - {args.Time:HH:mm:ss} ###############"); Console.WriteLine(); Console.ForegroundColor = oldColor; diff --git a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJobArgs.cs b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJobArgs.cs index 6e3fb82e57..ba70d9ca07 100644 --- a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJobArgs.cs +++ b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJobArgs.cs @@ -1,8 +1,17 @@ -namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs +using System; + +namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs { [BackgroundJobName("GreenJob")] public class WriteToConsoleGreenJobArgs { public string Value { get; set; } + + public DateTime Time { get; set; } + + public WriteToConsoleGreenJobArgs() + { + Time = DateTime.Now; + } } } \ No newline at end of file diff --git a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJob.cs b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJob.cs index 90838bc87d..99e3023081 100644 --- a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJob.cs +++ b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJob.cs @@ -9,14 +9,14 @@ namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs { if (RandomHelper.GetRandom(0, 100) < 70) { - throw new ApplicationException("A sample exception from the WriteToConsoleYellowJob!"); + //throw new ApplicationException("A sample exception from the WriteToConsoleYellowJob!"); } var oldColor = Console.ForegroundColor; Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine(); - Console.WriteLine($"############### WriteToConsoleYellowJob: {args.Value} ###############"); + Console.WriteLine($"############### WriteToConsoleYellowJob: {args.Value} - {args.Time:HH:mm:ss} ###############"); Console.WriteLine(); Console.ForegroundColor = oldColor; diff --git a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJobArgs.cs b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJobArgs.cs index e33ca2aa89..d0788fccd4 100644 --- a/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJobArgs.cs +++ b/modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJobArgs.cs @@ -1,8 +1,17 @@ -namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs +using System; + +namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs { [BackgroundJobName("YellowJob")] public class WriteToConsoleYellowJobArgs { public string Value { get; set; } + + public DateTime Time { get; set; } + + public WriteToConsoleYellowJobArgs() + { + Time = DateTime.Now; + } } }