Browse Source

Implemented rabbitmq background jobs.

pull/395/head
Halil ibrahim Kalkan 8 years ago
parent
commit
3c87b72d75
  1. 18
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs
  2. 11
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueue.cs
  3. 9
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueueManager.cs
  4. 63
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs
  5. 74
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueManager.cs
  6. 11
      framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/RabbitMqBackgroundJobManager.cs
  7. 2
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AbpBackgroundWorkersModule.cs
  8. 24
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerBase.cs
  9. 51
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs
  10. 25
      framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs
  11. 2
      framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs
  12. 67
      framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpTimer.cs
  13. 18
      framework/src/Volo.Abp.Threading/Volo/Abp/Threading/IRunnable.cs
  14. 29
      framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableBase.cs
  15. 22
      framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableExtensions.cs
  16. 7
      modules/background-jobs/Volo.Abp.BackgroundJobs.sln
  17. 30
      modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/DemoAppHangfireModule.cs
  18. 24
      modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/Program.cs
  19. 14
      modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq.csproj
  20. 2
      modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJob.cs
  21. 11
      modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJobArgs.cs
  22. 4
      modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJob.cs
  23. 11
      modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJobArgs.cs

18
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/AbpBackgroundJobsRabbitMqModule.cs

@ -1,13 +1,14 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.Modularity;
using Volo.Abp.RabbitMQ;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
[DependsOn(
typeof(AbpBackgroundJobsAbstractionsModule),
typeof(AbpRabbitMqModule)
typeof(AbpRabbitMqModule),
typeof(AbpThreadingModule)
)]
public class AbpBackgroundJobsRabbitMqModule : AbpModule
{
@ -20,16 +21,9 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
//TODO: Move all to another class and stop listeners when needed!
var options = context.ServiceProvider.GetRequiredService<IOptions<BackgroundJobOptions>>().Value;
if (options.IsJobExecutionEnabled)
{
foreach (var jobType in options.JobTypes.Values)
{
var jobListener = (IJobListener)context.ServiceProvider.GetRequiredService(typeof(JobListener<>).MakeGenericType(jobType));
jobListener.Start();
}
}
context.ServiceProvider
.GetRequiredService<IJobQueueManager>()
.Start();
}
}
}

11
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueue.cs

@ -1,12 +1,15 @@
using System;
using System.Threading.Tasks;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
public interface IJobQueue<TArgs> : IDisposable
public interface IJobQueue<TArgs> : IRunnable, IDisposable
{
Task<string> Enqueue(TArgs args);
Task StartAsync();
Task<string> EnqueueAsync(
TArgs args,
BackgroundJobPriority priority = BackgroundJobPriority.Normal,
TimeSpan? delay = null
);
}
}

9
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/IJobQueueManager.cs

@ -0,0 +1,9 @@
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
public interface IJobQueueManager : IRunnable
{
IJobQueue<TArgs> Get<TArgs>();
}
}

63
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueue.cs

@ -1,6 +1,9 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Nito.AsyncEx;
using RabbitMQ.Client;
@ -9,6 +12,8 @@ using Volo.Abp.RabbitMQ;
namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
//TODO: Needs refactoring
public class JobQueue<TArgs> : IJobQueue<TArgs>
{
protected Type JobType { get; }
@ -17,6 +22,8 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
protected IChannelAccessor ChannelAccessor { get; private set; }
protected EventingBasicConsumer Consumer { get; private set; }
public ILogger<JobQueue<TArgs>> Logger { get; set; }
protected IChannelPool ChannelPool { get; }
protected AbpRabbitMqOptions RabbitMqOptions { get; }
@ -43,9 +50,14 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
JobName = BackgroundJobNameAttribute.GetName<TArgs>();
JobType = BackgroundJobOptions.GetJobType(JobName);
QueueName = "BackgroundJobs." + JobName; //TODO: Make prefix optional
Logger = NullLogger<JobQueue<TArgs>>.Instance;
}
public virtual async Task<string> Enqueue(TArgs args)
public virtual async Task<string> EnqueueAsync(
TArgs args,
BackgroundJobPriority priority = BackgroundJobPriority.Normal,
TimeSpan? delay = null)
{
CheckDisposed();
@ -53,20 +65,31 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
await EnsureInitializedAsync();
await PublishAsync(args);
await PublishAsync(args, priority, delay);
return null;
}
}
public Task StartAsync()
public async Task StartAsync(CancellationToken cancellationToken = default)
{
CheckDisposed();
if (!BackgroundJobOptions.IsJobExecutionEnabled)
{
return Task.CompletedTask;
return;
}
using (await SyncObj.LockAsync())
{
await EnsureInitializedAsync();
}
}
return EnsureInitializedAsync();
public Task StopAsync(CancellationToken cancellationToken = default)
{
Dispose();
return Task.CompletedTask;
}
public virtual void Dispose()
@ -81,7 +104,7 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
ChannelAccessor?.Dispose();
}
public virtual Task EnsureInitializedAsync()
protected virtual Task EnsureInitializedAsync()
{
if (ChannelAccessor != null)
{
@ -93,7 +116,8 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
var queueOptions = RabbitMqOptions.Queues.GetOrDefault(QueueName)
?? new QueueOptions(QueueName);
queueOptions.Declare(ChannelAccessor.Channel);
var result = queueOptions.Declare(ChannelAccessor.Channel);
Logger.LogDebug($"RabbitMQ Queue '{QueueName}' has {result.MessageCount} messages and {result.ConsumerCount} consumers.");
if (BackgroundJobOptions.IsJobExecutionEnabled)
{
@ -111,8 +135,13 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
return Task.CompletedTask;
}
protected virtual Task PublishAsync(TArgs args)
protected virtual Task PublishAsync(
TArgs args,
BackgroundJobPriority priority = BackgroundJobPriority.Normal,
TimeSpan? delay = null)
{
//TODO: How to handle priority & delay?
ChannelAccessor.Channel.BasicPublish(
exchange: "",
routingKey: QueueName,
@ -137,9 +166,21 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
Serializer.Deserialize(ea.Body, typeof(TArgs))
);
JobExecuter.Execute(context);
//TODO: How to ACK on success or Reject on failure?
try
{
JobExecuter.Execute(context);
ChannelAccessor.Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (BackgroundJobExecutionException)
{
//TODO: Reject like that?
ChannelAccessor.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
}
catch (Exception)
{
//TODO: Reject like that?
ChannelAccessor.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
}
}
private void CheckDisposed()

74
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/JobQueueManager.cs

@ -0,0 +1,74 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
public class JobQueueManager : IJobQueueManager, ISingletonDependency
{
protected ConcurrentDictionary<string, IRunnable> JobQueues { get; }
protected IServiceProvider ServiceProvider { get; }
protected BackgroundJobOptions Options { get; }
public JobQueueManager(
IOptions<BackgroundJobOptions> options,
IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
Options = options.Value;
JobQueues = new ConcurrentDictionary<string, IRunnable>();
}
public async Task StartAsync(CancellationToken cancellationToken = default)
{
if (!Options.IsJobExecutionEnabled)
{
return;
}
foreach (var item in Options.JobTypes)
{
var jobName = item.Key;
var jobType = item.Value;
var argsType = BackgroundJobArgsHelper.GetJobArgsType(jobType);
var jobQueue = (IRunnable)ServiceProvider.GetRequiredService(typeof(IJobQueue<>).MakeGenericType(argsType));
await jobQueue.StartAsync(cancellationToken);
JobQueues[jobName] = jobQueue;
}
}
public async Task StopAsync(CancellationToken cancellationToken = default)
{
foreach (var jobQueue in JobQueues.Values)
{
await jobQueue.StopAsync(cancellationToken);
}
JobQueues.Clear();
}
public IJobQueue<TArgs> Get<TArgs>()
{
var jobName = BackgroundJobNameAttribute.GetName(typeof(TArgs));
if (!Options.JobTypes.ContainsKey(jobName))
{
throw new AbpException("No job registered");
}
return (IJobQueue<TArgs>)JobQueues.GetOrAdd(jobName, _ =>
{
var jobQueue = (IRunnable)ServiceProvider.GetRequiredService(typeof(IJobQueue<>).MakeGenericType(typeof(TArgs)));
jobQueue.Start();
return jobQueue;
});
}
}
}

11
framework/src/Volo.Abp.BackgroundJobs.RabbitMQ/Volo/Abp/BackgroundJobs/RabbitMQ/RabbitMqBackgroundJobManager.cs

@ -8,13 +8,11 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
public class RabbitMqBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
protected IChannelPool ChannelPool { get; }
protected IRabbitMqSerializer Serializer { get; }
private readonly IJobQueueManager _jobQueueManager;
public RabbitMqBackgroundJobManager(IChannelPool channelPool, IRabbitMqSerializer serializer)
public RabbitMqBackgroundJobManager(IJobQueueManager jobQueueManager)
{
Serializer = serializer;
ChannelPool = channelPool;
_jobQueueManager = jobQueueManager;
}
public Task<string> EnqueueAsync<TArgs>(
@ -22,7 +20,8 @@ namespace Volo.Abp.BackgroundJobs.RabbitMQ
BackgroundJobPriority priority = BackgroundJobPriority.Normal,
TimeSpan? delay = null)
{
var jobQueue = _jobQueueManager.Get<TArgs>();
return jobQueue.EnqueueAsync(args, priority, delay);
}
}
}

2
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/AbpBackgroundWorkersModule.cs

@ -33,7 +33,7 @@ namespace Volo.Abp.BackgroundWorkers
{
context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.StopAndWaitToStop();
.Stop();
}
}
}

24
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerBase.cs

@ -1,13 +1,14 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Volo.Abp.Threading;
namespace Volo.Abp.BackgroundWorkers
{
/// <summary>
/// Base class that can be used to implement <see cref="IBackgroundWorker"/>.
/// </summary>
public abstract class BackgroundWorkerBase : RunnableBase, IBackgroundWorker
public abstract class BackgroundWorkerBase : IBackgroundWorker
{
//TODO: Add UOW, Localization and other useful properties..?
@ -17,28 +18,19 @@ namespace Volo.Abp.BackgroundWorkers
{
Logger = NullLogger<BackgroundWorkerBase>.Instance;
}
public override void Start()
public virtual Task StartAsync(CancellationToken cancellationToken = default)
{
Logger.LogDebug("Starting background worker: " + ToString());
base.Start();
Logger.LogDebug("Started background worker: " + ToString());
return Task.CompletedTask;
}
public override void Stop()
public virtual Task StopAsync(CancellationToken cancellationToken = default)
{
Logger.LogDebug("Stopping background worker: " + ToString());
base.Stop();
Logger.LogDebug("Stopped background worker: " + ToString());
return Task.CompletedTask;
}
public override void WaitToStop()
{
Logger.LogDebug("Waiting background worker to completely stop: " + ToString());
base.WaitToStop();
Logger.LogDebug("Background worker is completely stopped: " + ToString());
}
public override string ToString()
{
return GetType().FullName;

51
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/BackgroundWorkerManager.cs

@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;
@ -8,8 +10,12 @@ namespace Volo.Abp.BackgroundWorkers
/// <summary>
/// Implements <see cref="IBackgroundWorkerManager"/>.
/// </summary>
public class BackgroundWorkerManager : RunnableBase, IBackgroundWorkerManager, ISingletonDependency, IDisposable
public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
protected bool IsRunning { get; private set; }
private bool _isDisposed;
private readonly List<IBackgroundWorker> _backgroundWorkers;
/// <summary>
@ -20,27 +26,6 @@ namespace Volo.Abp.BackgroundWorkers
_backgroundWorkers = new List<IBackgroundWorker>();
}
public override void Start()
{
base.Start();
_backgroundWorkers.ForEach(worker => worker.Start());
}
public override void Stop()
{
_backgroundWorkers.ForEach(worker => worker.Stop());
base.Stop();
}
public override void WaitToStop()
{
_backgroundWorkers.ForEach(worker => worker.WaitToStop());
base.WaitToStop();
}
public void Add(IBackgroundWorker worker)
{
_backgroundWorkers.Add(worker);
@ -51,8 +36,6 @@ namespace Volo.Abp.BackgroundWorkers
}
}
private bool _isDisposed;
public void Dispose()
{
if (_isDisposed)
@ -64,5 +47,25 @@ namespace Volo.Abp.BackgroundWorkers
//TODO: ???
}
public async Task StartAsync(CancellationToken cancellationToken = default)
{
IsRunning = true;
foreach (var worker in _backgroundWorkers)
{
await worker.StartAsync(cancellationToken);
}
}
public async Task StopAsync(CancellationToken cancellationToken = default)
{
IsRunning = false;
foreach (var worker in _backgroundWorkers)
{
await worker.StopAsync(cancellationToken);
}
}
}
}

25
framework/src/Volo.Abp.BackgroundWorkers/Volo/Abp/BackgroundWorkers/PeriodicBackgroundWorkerBase.cs

@ -1,4 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Volo.Abp.Threading;
@ -21,27 +23,18 @@ namespace Volo.Abp.BackgroundWorkers
Timer.Elapsed += Timer_Elapsed;
}
public override void Start()
public override async Task StartAsync(CancellationToken cancellationToken = default)
{
base.Start();
Timer.Start();
await base.StartAsync(cancellationToken);
await Timer.StartAsync(cancellationToken);
}
public override void Stop()
public override async Task StopAsync(CancellationToken cancellationToken = default)
{
Timer.Stop();
base.Stop();
await Timer.StopAsync(cancellationToken);
await base.StopAsync(cancellationToken);
}
public override void WaitToStop()
{
Timer.WaitToStop();
base.WaitToStop();
}
/// <summary>
/// Handles the Elapsed event of the Timer.
/// </summary>
private void Timer_Elapsed(object sender, System.EventArgs e)
{
try

2
framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/ConnectionPool.cs

@ -23,6 +23,8 @@ namespace Volo.Abp.RabbitMQ
public virtual IConnection Get(string connectionName = null)
{
connectionName = connectionName ?? RabbitMqConnections.DefaultConnectionName;
return Connections.GetOrAdd(connectionName, () =>
{
var connectionFactory = Options.Connections.GetOrDefault(connectionName)

67
framework/src/Volo.Abp.Threading/Volo/Abp/Threading/AbpTimer.cs

@ -1,5 +1,8 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.Threading
@ -7,7 +10,7 @@ namespace Volo.Abp.Threading
/// <summary>
/// A roboust timer implementation that ensures no overlapping occurs. It waits exactly specified <see cref="Period"/> between ticks.
/// </summary>
public class AbpTimer : RunnableBase, ITransientDependency
public class AbpTimer : IRunnable, ITransientDependency
{
/// <summary>
/// This event is raised periodically according to Period of Timer.
@ -25,74 +28,49 @@ namespace Volo.Abp.Threading
/// </summary>
public bool RunOnStart { get; set; }
/// <summary>
/// This timer is used to perfom the task at spesified intervals.
/// </summary>
private readonly Timer _taskTimer;
public ILogger<AbpTimer> Logger { get; set; }
/// <summary>
/// Indicates that whether timer is running or stopped.
/// </summary>
private volatile bool _running;
/// <summary>
/// Indicates that whether performing the task or _taskTimer is in sleep mode.
/// This field is used to wait executing tasks when stopping Timer.
/// </summary>
private readonly Timer _taskTimer;
private volatile bool _performingTasks;
private volatile bool _isRunning;
/// <summary>
/// Creates a new Timer.
/// </summary>
public AbpTimer()
{
Logger = NullLogger<AbpTimer>.Instance;
_taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite);
}
/// <summary>
/// Starts the timer.
/// </summary>
public override void Start()
public Task StartAsync(CancellationToken cancellationToken = default)
{
if (Period <= 0)
{
throw new AbpException("Period should be set before starting the timer!");
}
base.Start();
_running = true;
_taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
}
/// <summary>
/// Stops the timer.
/// </summary>
public override void Stop()
{
lock (_taskTimer)
{
_running = false;
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
_taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
_isRunning = true;
}
base.Stop();
return Task.CompletedTask;
}
/// <summary>
/// Waits the service to stop.
/// </summary>
public override void WaitToStop()
public Task StopAsync(CancellationToken cancellationToken = default)
{
lock (_taskTimer)
{
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
while (_performingTasks)
{
Monitor.Wait(_taskTimer);
}
_isRunning = false;
}
base.WaitToStop();
return Task.CompletedTask;
}
/// <summary>
@ -103,7 +81,7 @@ namespace Volo.Abp.Threading
{
lock (_taskTimer)
{
if (!_running || _performingTasks)
if (!_isRunning || _performingTasks)
{
return;
}
@ -114,10 +92,7 @@ namespace Volo.Abp.Threading
try
{
if (Elapsed != null)
{
Elapsed(this, new EventArgs());
}
Elapsed.InvokeSafely(this, new EventArgs());
}
catch
{
@ -128,7 +103,7 @@ namespace Volo.Abp.Threading
lock (_taskTimer)
{
_performingTasks = false;
if (_running)
if (_isRunning)
{
_taskTimer.Change(Period, Timeout.Infinite);
}

18
framework/src/Volo.Abp.Threading/Volo/Abp/Threading/IRunnable.cs

@ -1,4 +1,7 @@
namespace Volo.Abp.Threading
using System.Threading;
using System.Threading.Tasks;
namespace Volo.Abp.Threading
{
/// <summary>
/// Interface to start/stop self threaded services.
@ -8,18 +11,11 @@
/// <summary>
/// Starts the service.
/// </summary>
void Start();
/// <summary>
/// Sends stop command to the service.
/// Service may return immediately and stop asynchronously.
/// A client should then call <see cref="WaitToStop"/> method to ensure it's stopped.
/// </summary>
void Stop();
Task StartAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Waits the service to stop.
/// Stops the service.
/// </summary>
void WaitToStop();
Task StopAsync(CancellationToken cancellationToken = default);
}
}

29
framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableBase.cs

@ -1,29 +0,0 @@
namespace Volo.Abp.Threading
{
/// <summary>
/// Base implementation of <see cref="IRunnable"/>.
/// </summary>
public abstract class RunnableBase : IRunnable
{
/// <summary>
/// A boolean value to check if this is running.
/// </summary>
public bool IsRunning => _isRunning;
private volatile bool _isRunning;
public virtual void Start()
{
_isRunning = true;
}
public virtual void Stop()
{
_isRunning = false;
}
public virtual void WaitToStop()
{
}
}
}

22
framework/src/Volo.Abp.Threading/Volo/Abp/Threading/RunnableExtensions.cs

@ -1,17 +1,21 @@
using JetBrains.Annotations;
namespace Volo.Abp.Threading
{
/// <summary>
/// Some extension methods for <see cref="IRunnable"/>.
/// </summary>
public static class RunnableExtensions
{
/// <summary>
/// Calls <see cref="IRunnable.Stop"/> and then <see cref="IRunnable.WaitToStop"/>.
/// </summary>
public static void StopAndWaitToStop(this IRunnable runnable)
public static void Start([NotNull] this IRunnable runnable)
{
runnable.Stop();
runnable.WaitToStop();
Check.NotNull(runnable, nameof(runnable));
AsyncHelper.RunSync(() => runnable.StartAsync());
}
public static void Stop([NotNull] this IRunnable runnable)
{
Check.NotNull(runnable, nameof(runnable));
AsyncHelper.RunSync(() => runnable.StopAsync());
}
}
}

7
modules/background-jobs/Volo.Abp.BackgroundJobs.sln

@ -31,6 +31,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BackgroundJobs.Dem
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BackgroundJobs.DemoApp.HangFire", "app\Volo.Abp.BackgroundJobs.DemoApp.HangFire\Volo.Abp.BackgroundJobs.DemoApp.HangFire.csproj", "{2060AC85-2598-4342-A87C-A684A2C71A37}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BackgroundJobs.DemoApp.RabbitMq", "app\Volo.Abp.BackgroundJobs.DemoApp.RabbitMq\Volo.Abp.BackgroundJobs.DemoApp.RabbitMq.csproj", "{7C8D03F7-165E-478C-A6D6-DC8229EEABB9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -81,6 +83,10 @@ Global
{2060AC85-2598-4342-A87C-A684A2C71A37}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2060AC85-2598-4342-A87C-A684A2C71A37}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2060AC85-2598-4342-A87C-A684A2C71A37}.Release|Any CPU.Build.0 = Release|Any CPU
{7C8D03F7-165E-478C-A6D6-DC8229EEABB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7C8D03F7-165E-478C-A6D6-DC8229EEABB9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7C8D03F7-165E-478C-A6D6-DC8229EEABB9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7C8D03F7-165E-478C-A6D6-DC8229EEABB9}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -97,6 +103,7 @@ Global
{9A871D66-BE8D-440C-BF79-F778F8A50BF3} = {E400416D-2895-4512-9D17-90681EEC7E0A}
{DE6914FF-F60A-461A-8498-461198E917BE} = {E400416D-2895-4512-9D17-90681EEC7E0A}
{2060AC85-2598-4342-A87C-A684A2C71A37} = {E400416D-2895-4512-9D17-90681EEC7E0A}
{7C8D03F7-165E-478C-A6D6-DC8229EEABB9} = {E400416D-2895-4512-9D17-90681EEC7E0A}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

30
modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/DemoAppHangfireModule.cs

@ -0,0 +1,30 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Volo.Abp.Autofac;
using Volo.Abp.BackgroundJobs.DemoApp.Shared;
using Volo.Abp.BackgroundJobs.RabbitMQ;
using Volo.Abp.Modularity;
namespace Volo.Abp.BackgroundJobs.DemoApp.RabbitMq
{
[DependsOn(
typeof(DemoAppSharedModule),
typeof(AbpAutofacModule),
typeof(AbpBackgroundJobsRabbitMqModule)
)]
public class DemoAppHangfireModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddAssemblyOf<DemoAppHangfireModule>();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context
.ServiceProvider
.GetRequiredService<ILoggerFactory>()
.AddConsole(LogLevel.Debug);
}
}
}

24
modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/Program.cs

@ -0,0 +1,24 @@
using System;
namespace Volo.Abp.BackgroundJobs.DemoApp.RabbitMq
{
class Program
{
static void Main(string[] args)
{
using (var application = AbpApplicationFactory.Create<DemoAppHangfireModule>(options =>
{
options.UseAutofac();
}))
{
application.Initialize();
Console.WriteLine("Started: " + typeof(Program).Namespace);
Console.WriteLine("Press ENTER to stop the application..!");
Console.ReadLine();
application.Shutdown();
}
}
}
}

14
modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq/Volo.Abp.BackgroundJobs.DemoApp.RabbitMq.csproj

@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\framework\src\Volo.Abp.Autofac\Volo.Abp.Autofac.csproj" />
<ProjectReference Include="..\..\..\..\framework\src\Volo.Abp.BackgroundJobs.RabbitMQ\Volo.Abp.BackgroundJobs.RabbitMQ.csproj" />
<ProjectReference Include="..\Volo.Abp.BackgroundJobs.DemoApp.Shared\Volo.Abp.BackgroundJobs.DemoApp.Shared.csproj" />
</ItemGroup>
</Project>

2
modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJob.cs

@ -16,7 +16,7 @@ namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine();
Console.WriteLine($"############### WriteToConsoleGreenJob: {args.Value} ###############");
Console.WriteLine($"############### WriteToConsoleGreenJob: {args.Value} - {args.Time:HH:mm:ss} ###############");
Console.WriteLine();
Console.ForegroundColor = oldColor;

11
modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleGreenJobArgs.cs

@ -1,8 +1,17 @@
namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs
using System;
namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs
{
[BackgroundJobName("GreenJob")]
public class WriteToConsoleGreenJobArgs
{
public string Value { get; set; }
public DateTime Time { get; set; }
public WriteToConsoleGreenJobArgs()
{
Time = DateTime.Now;
}
}
}

4
modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJob.cs

@ -9,14 +9,14 @@ namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs
{
if (RandomHelper.GetRandom(0, 100) < 70)
{
throw new ApplicationException("A sample exception from the WriteToConsoleYellowJob!");
//throw new ApplicationException("A sample exception from the WriteToConsoleYellowJob!");
}
var oldColor = Console.ForegroundColor;
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine();
Console.WriteLine($"############### WriteToConsoleYellowJob: {args.Value} ###############");
Console.WriteLine($"############### WriteToConsoleYellowJob: {args.Value} - {args.Time:HH:mm:ss} ###############");
Console.WriteLine();
Console.ForegroundColor = oldColor;

11
modules/background-jobs/app/Volo.Abp.BackgroundJobs.DemoApp.Shared/Jobs/WriteToConsoleYellowJobArgs.cs

@ -1,8 +1,17 @@
namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs
using System;
namespace Volo.Abp.BackgroundJobs.DemoApp.Shared.Jobs
{
[BackgroundJobName("YellowJob")]
public class WriteToConsoleYellowJobArgs
{
public string Value { get; set; }
public DateTime Time { get; set; }
public WriteToConsoleYellowJobArgs()
{
Time = DateTime.Now;
}
}
}

Loading…
Cancel
Save