Browse Source

Merge pull request #22169 from abpframework/BackgroundJob-ApplicationName

Add `ApplicationName` property to isolate jobs/workers.
pull/22316/head
Halil İbrahim Kalkan 1 year ago
committed by GitHub
parent
commit
ec70d0c970
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      Directory.Packages.props
  2. 19
      framework/src/Volo.Abp.BackgroundJobs.HangFire/Volo/Abp/BackgroundJobs/Hangfire/HangfireBackgroundJobManager.cs
  3. 5
      framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/AbpBackgroundJobWorkerOptions.cs
  4. 5
      framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/BackgroundJobInfo.cs
  5. 2
      framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/BackgroundJobWorker.cs
  6. 8
      framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/DefaultBackgroundJobManager.cs
  7. 5
      framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/IBackgroundJobStore.cs
  8. 3
      framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/InMemoryBackgroundJobStore.cs
  9. 2
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerBase.cs
  10. 23
      framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs
  11. 4
      framework/src/Volo.Abp.HangFire/Volo/Abp/Hangfire/AbpHangfireModule.cs
  12. 12
      framework/src/Volo.Abp.HangFire/Volo/Abp/Hangfire/AbpHangfireOptions.cs
  13. 65
      framework/src/Volo.Abp.HangFire/Volo/Abp/Hangfire/AbpHangfireOptionsConfiguration.cs
  14. 5
      modules/background-jobs/src/Volo.Abp.BackgroundJobs.Domain.Shared/Volo/Abp/BackgroundJobs/BackgroundJobRecordConsts.cs
  15. 5
      modules/background-jobs/src/Volo.Abp.BackgroundJobs.Domain/Volo/Abp/BackgroundJobs/BackgroundJobRecord.cs
  16. 4
      modules/background-jobs/src/Volo.Abp.BackgroundJobs.Domain/Volo/Abp/BackgroundJobs/BackgroundJobStore.cs
  17. 3
      modules/background-jobs/src/Volo.Abp.BackgroundJobs.Domain/Volo/Abp/BackgroundJobs/IBackgroundJobRepository.cs
  18. 1
      modules/background-jobs/src/Volo.Abp.BackgroundJobs.EntityFrameworkCore/Volo/Abp/BackgroundJobs/EntityFrameworkCore/BackgroundJobsDbContextModelCreatingExtensions.cs
  19. 10
      modules/background-jobs/src/Volo.Abp.BackgroundJobs.EntityFrameworkCore/Volo/Abp/BackgroundJobs/EntityFrameworkCore/EfCoreBackgroundJobRepository.cs
  20. 10
      modules/background-jobs/src/Volo.Abp.BackgroundJobs.MongoDB/Volo/Abp/BackgroundJobs/MongoDB/MongoBackgroundJobRepository.cs
  21. 9
      modules/background-jobs/test/Volo.Abp.BackgroundJobs.TestBase/Volo/Abp/BackgroundJobs/BackgroundJobRepository_Tests.cs
  22. 3
      modules/background-jobs/test/Volo.Abp.BackgroundJobs.TestBase/Volo/Abp/BackgroundJobs/BackgroundJobsTestDataBuilder.cs

1
Directory.Packages.props

@ -77,6 +77,7 @@
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.5.0" />
<PackageVersion Include="Microsoft.CSharp" Version="4.7.0" />
<PackageVersion Include="Microsoft.Data.Sqlite" Version="9.0.2" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="6.0.1" />
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="9.0.2" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.2" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.InMemory" Version="9.0.2" />

19
framework/src/Volo.Abp.BackgroundJobs.HangFire/Volo/Abp/BackgroundJobs/Hangfire/HangfireBackgroundJobManager.cs

@ -5,17 +5,20 @@ using Hangfire;
using Hangfire.States;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Hangfire;
namespace Volo.Abp.BackgroundJobs.Hangfire;
[Dependency(ReplaceServices = true)]
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
protected AbpBackgroundJobOptions Options { get; }
protected IOptions<AbpBackgroundJobOptions> BackgroundJobOptions { get; }
protected IOptions<AbpHangfireOptions> HangfireOptions { get; }
public HangfireBackgroundJobManager(IOptions<AbpBackgroundJobOptions> options)
public HangfireBackgroundJobManager(IOptions<AbpBackgroundJobOptions> backgroundJobOptions, IOptions<AbpHangfireOptions> hangfireOptions)
{
Options = options.Value;
BackgroundJobOptions = backgroundJobOptions;
HangfireOptions = hangfireOptions;
}
public virtual Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
@ -33,13 +36,7 @@ public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDep
protected virtual string GetQueueName(Type argsType)
{
var queueName = EnqueuedState.DefaultQueue;
var queueAttribute = Options.GetJob(argsType).JobType.GetCustomAttribute<QueueAttribute>();
if (queueAttribute != null)
{
queueName = queueAttribute.Queue;
}
return queueName;
var queueAttribute = BackgroundJobOptions.Value.GetJob(argsType).JobType.GetCustomAttribute<QueueAttribute>();
return queueAttribute != null ? HangfireOptions.Value.DefaultQueuePrefix + queueAttribute.Queue : HangfireOptions.Value.DefaultQueue;
}
}

5
framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/AbpBackgroundJobWorkerOptions.cs

@ -2,6 +2,11 @@
public class AbpBackgroundJobWorkerOptions
{
/// <summary>
/// Application name.
/// </summary>
public string? ApplicationName { get; set; }
/// <summary>
/// Interval between polling jobs from <see cref="IBackgroundJobStore"/>.
/// Default value: 5000 (5 seconds).

5
framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/BackgroundJobInfo.cs

@ -9,6 +9,11 @@ public class BackgroundJobInfo
{
public Guid Id { get; set; }
/// <summary>
/// Application name.
/// </summary>
public virtual string? ApplicationName { get; set; }
/// <summary>
/// Name of the job.
/// </summary>

2
framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/BackgroundJobWorker.cs

@ -43,7 +43,7 @@ public class BackgroundJobWorker : AsyncPeriodicBackgroundWorkerBase, IBackgroun
{
var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount);
var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.ApplicationName, WorkerOptions.MaxJobFetchCount);
if (!waitingJobs.Any())
{

8
framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/DefaultBackgroundJobManager.cs

@ -1,5 +1,7 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids;
using Volo.Abp.Timing;
@ -16,16 +18,19 @@ public class DefaultBackgroundJobManager : IBackgroundJobManager, ITransientDepe
protected IBackgroundJobSerializer Serializer { get; }
protected IGuidGenerator GuidGenerator { get; }
protected IBackgroundJobStore Store { get; }
protected IOptions<AbpBackgroundJobWorkerOptions> BackgroundJobWorkerOptions { get; }
public DefaultBackgroundJobManager(
IClock clock,
IBackgroundJobSerializer serializer,
IBackgroundJobStore store,
IGuidGenerator guidGenerator)
IGuidGenerator guidGenerator,
IOptions<AbpBackgroundJobWorkerOptions> backgroundJobWorkerOptions)
{
Clock = clock;
Serializer = serializer;
GuidGenerator = guidGenerator;
BackgroundJobWorkerOptions = backgroundJobWorkerOptions;
Store = store;
}
@ -41,6 +46,7 @@ public class DefaultBackgroundJobManager : IBackgroundJobManager, ITransientDepe
var jobInfo = new BackgroundJobInfo
{
Id = GuidGenerator.Create(),
ApplicationName = BackgroundJobWorkerOptions.Value.ApplicationName,
JobName = jobName,
JobArgs = Serializer.Serialize(args),
Priority = priority,

5
framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/IBackgroundJobStore.cs

@ -24,12 +24,13 @@ public interface IBackgroundJobStore
/// <summary>
/// Gets waiting jobs. It should get jobs based on these:
/// Conditions: !IsAbandoned And NextTryTime &lt;= Clock.Now.
/// Conditions: ApplicationName is applicationName And !IsAbandoned And NextTryTime &lt;= Clock.Now.
/// Order by: Priority DESC, TryCount ASC, NextTryTime ASC.
/// Maximum result: <paramref name="maxResultCount"/>.
/// </summary>
/// <param name="applicationName">Application name.</param>
/// <param name="maxResultCount">Maximum result count.</param>
Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount);
Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(string? applicationName, int maxResultCount);
/// <summary>
/// Deletes a job.

3
framework/src/Volo.Abp.BackgroundJobs/Volo/Abp/BackgroundJobs/InMemoryBackgroundJobStore.cs

@ -35,9 +35,10 @@ public class InMemoryBackgroundJobStore : IBackgroundJobStore, ISingletonDepende
return Task.CompletedTask;
}
public virtual Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount)
public virtual Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(string? applicationName, int maxResultCount)
{
var waitingJobs = _jobs.Values
.Where(t => t.ApplicationName == applicationName)
.Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)

2
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerBase.cs

@ -13,7 +13,7 @@ public abstract class HangfireBackgroundWorkerBase : BackgroundWorkerBase, IHang
public TimeZoneInfo? TimeZone { get; set; } = TimeZoneInfo.Utc;
public string Queue { get; set; } = EnqueuedState.DefaultQueue;
public string Queue { get; set; } = default!;
public abstract Task DoWorkAsync(CancellationToken cancellationToken = default);
}

23
framework/src/Volo.Abp.BackgroundWorkers.Hangfire/Volo/Abp/BackgroundWorkers/Hangfire/HangfireBackgroundWorkerManager.cs

@ -4,6 +4,7 @@ using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy;
using Volo.Abp.Hangfire;
@ -30,15 +31,19 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
public async override Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
var abpHangfireOptions = ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
var defaultQueuePrefix = abpHangfireOptions.DefaultQueuePrefix;
var defaultQueue = abpHangfireOptions.DefaultQueue;
switch (worker)
{
case IHangfireBackgroundWorker hangfireBackgroundWorker:
{
var unProxyWorker = ProxyHelper.UnProxy(hangfireBackgroundWorker);
RecurringJob.AddOrUpdate(
hangfireBackgroundWorker.RecurringJobId,
hangfireBackgroundWorker.Queue,
hangfireBackgroundWorker.Queue.IsNullOrWhiteSpace() ? defaultQueue : defaultQueuePrefix + hangfireBackgroundWorker.Queue,
() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(cancellationToken),
hangfireBackgroundWorker.CronExpression,
new RecurringJobOptions
@ -57,24 +62,24 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
{
return;
}
var adapterType = typeof(HangfirePeriodicBackgroundWorkerAdapter<>).MakeGenericType(ProxyHelper.GetUnProxiedType(worker));
var workerAdapter = (Activator.CreateInstance(adapterType) as IHangfireBackgroundWorker)!;
if (workerAdapter.RecurringJobId.IsNullOrWhiteSpace())
{
RecurringJob.AddOrUpdate(
RecurringJob.AddOrUpdate(
() => workerAdapter.DoWorkAsync(cancellationToken),
GetCron(period.Value),
workerAdapter.TimeZone ,
workerAdapter.Queue);
workerAdapter.Queue.IsNullOrWhiteSpace() ? defaultQueue : defaultQueuePrefix + workerAdapter.Queue);
}
else
{
RecurringJob.AddOrUpdate(
workerAdapter.RecurringJobId,
workerAdapter.Queue,
workerAdapter.Queue.IsNullOrWhiteSpace() ? defaultQueue : defaultQueuePrefix + workerAdapter.Queue,
() => workerAdapter.DoWorkAsync(cancellationToken),
GetCron(period.Value),
new RecurringJobOptions
@ -82,7 +87,7 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
TimeZone = workerAdapter.TimeZone
});
}
break;
}
@ -121,4 +126,4 @@ public class HangfireBackgroundWorkerManager : BackgroundWorkerManager, ISinglet
return cron;
}
}
}

4
framework/src/Volo.Abp.HangFire/Volo/Abp/Hangfire/AbpHangfireModule.cs

@ -1,5 +1,6 @@
using Hangfire;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using Volo.Abp.Authorization;
using Volo.Abp.Modularity;
@ -24,7 +25,10 @@ public class AbpHangfireModule : AbpModule
var options = serviceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
return new AbpHangfireBackgroundJobServer(options.BackgroundJobServerFactory.Invoke(serviceProvider));
});
context.Services.TryAddEnumerable(ServiceDescriptor.Singleton<IPostConfigureOptions<AbpHangfireOptions>, AbpHangfireOptionsConfiguration>());
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
_backgroundJobServer = context.ServiceProvider.GetRequiredService<AbpHangfireBackgroundJobServer>();

12
framework/src/Volo.Abp.HangFire/Volo/Abp/Hangfire/AbpHangfireOptions.cs

@ -12,6 +12,18 @@ namespace Volo.Abp.Hangfire;
public class AbpHangfireOptions
{
/// <summary>
/// This value is used to add prefix to all of the queues. Default is empty.
/// </summary>
public string DefaultQueuePrefix { get; set; } = string.Empty;
/// <summary>
/// Hangfire queue name max length, default is 50.
/// </summary>
public int MaxQueueNameLength { get; set; } = 50;
public string DefaultQueue { get; set; } = EnqueuedState.DefaultQueue;
public BackgroundJobServerOptions? ServerOptions { get; set; }
public IEnumerable<IBackgroundProcess>? AdditionalProcesses { get; set; }

65
framework/src/Volo.Abp.HangFire/Volo/Abp/Hangfire/AbpHangfireOptionsConfiguration.cs

@ -0,0 +1,65 @@
using System;
using System.Linq;
using System.Text.RegularExpressions;
using Hangfire;
using Hangfire.States;
using Microsoft.Extensions.Options;
namespace Volo.Abp.Hangfire;
public class AbpHangfireOptionsConfiguration : IPostConfigureOptions<AbpHangfireOptions>
{
public void PostConfigure(string? name, AbpHangfireOptions options)
{
if (options.DefaultQueuePrefix.IsNullOrWhiteSpace())
{
return;;
}
// The Queue name argument must consist of lowercase letters, digits, underscore, and dash characters only.
var queuesPrefix = Regex.Replace(options.DefaultQueuePrefix.ToLower().Replace(".", "_"), "[^a-z0-9_-]", "");
if (queuesPrefix.IsNullOrWhiteSpace())
{
throw new AbpException($"The QueuesPrefix({options.DefaultQueuePrefix}) is not valid, it must consist of lowercase letters, digits, underscore, and dash characters only.");
}
options.DefaultQueuePrefix = queuesPrefix.EnsureEndsWith('_');
if (options.ServerOptions == null)
{
var queue = $"{options.DefaultQueuePrefix}{EnqueuedState.DefaultQueue}";
if (queue.Length > options.MaxQueueNameLength)
{
throw new AbpException($"The maximum length of the Hangfire queue name({queue}) is {options.MaxQueueNameLength}, Please configure the AbpHangfireOptions.DefaultQueuePrefix manually.");
}
options.ServerOptions = new BackgroundJobServerOptions
{
Queues = new[] { queue }
};
options.DefaultQueue = queue;
}
else
{
var queues = options.ServerOptions.Queues;
for (var i = 0; i < queues.Length; i++)
{
var queue = $"{options.DefaultQueuePrefix}{queues[i]}";
if (queue.Length > options.MaxQueueNameLength)
{
throw new AbpException($"The maximum length of the Hangfire queue name({queue}) is {options.MaxQueueNameLength}, Please configure the AbpHangfireOptions.DefaultQueuePrefix manually.");
}
queues[i] = queue;
}
var defaultQueue = queues.FirstOrDefault(q => q.EndsWith(EnqueuedState.DefaultQueue));
if (defaultQueue.IsNullOrWhiteSpace())
{
defaultQueue = queues.FirstOrDefault();
if (defaultQueue.IsNullOrWhiteSpace())
{
throw new AbpException("There is no queue defined in the Hangfire configuration!");
}
}
options.DefaultQueue = defaultQueue;
}
}
}

5
modules/background-jobs/src/Volo.Abp.BackgroundJobs.Domain.Shared/Volo/Abp/BackgroundJobs/BackgroundJobRecordConsts.cs

@ -2,6 +2,11 @@
public static class BackgroundJobRecordConsts
{
/// <summary>
/// Default value: 96
/// </summary>
public static int MaxApplicationNameLength { get; set; } = 96;
/// <summary>
/// Default value: 128
/// </summary>

5
modules/background-jobs/src/Volo.Abp.BackgroundJobs.Domain/Volo/Abp/BackgroundJobs/BackgroundJobRecord.cs

@ -6,6 +6,11 @@ namespace Volo.Abp.BackgroundJobs;
public class BackgroundJobRecord : AggregateRoot<Guid>, IHasCreationTime
{
/// <summary>
/// Application name that scheduled this job.
/// </summary>
public virtual string ApplicationName { get; set; }
/// <summary>
/// Type of the job.
/// It's AssemblyQualifiedName of job type.

4
modules/background-jobs/src/Volo.Abp.BackgroundJobs.Domain/Volo/Abp/BackgroundJobs/BackgroundJobStore.cs

@ -34,10 +34,10 @@ public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency
);
}
public virtual async Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount)
public virtual async Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(string applicationName, int maxResultCount)
{
return ObjectMapper.Map<List<BackgroundJobRecord>, List<BackgroundJobInfo>>(
await BackgroundJobRepository.GetWaitingListAsync(maxResultCount)
await BackgroundJobRepository.GetWaitingListAsync(applicationName, maxResultCount)
);
}

3
modules/background-jobs/src/Volo.Abp.BackgroundJobs.Domain/Volo/Abp/BackgroundJobs/IBackgroundJobRepository.cs

@ -2,11 +2,12 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Volo.Abp.Domain.Repositories;
namespace Volo.Abp.BackgroundJobs;
public interface IBackgroundJobRepository : IBasicRepository<BackgroundJobRecord, Guid>
{
Task<List<BackgroundJobRecord>> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default);
Task<List<BackgroundJobRecord>> GetWaitingListAsync([CanBeNull] string applicationName, int maxResultCount, CancellationToken cancellationToken = default);
}

1
modules/background-jobs/src/Volo.Abp.BackgroundJobs.EntityFrameworkCore/Volo/Abp/BackgroundJobs/EntityFrameworkCore/BackgroundJobsDbContextModelCreatingExtensions.cs

@ -22,6 +22,7 @@ public static class BackgroundJobsDbContextModelCreatingExtensions
b.ConfigureByConvention();
b.Property(x => x.ApplicationName).IsRequired(false).HasMaxLength(BackgroundJobRecordConsts.MaxApplicationNameLength);
b.Property(x => x.JobName).IsRequired().HasMaxLength(BackgroundJobRecordConsts.MaxJobNameLength);
b.Property(x => x.JobArgs).IsRequired().HasMaxLength(BackgroundJobRecordConsts.MaxJobArgsLength);
b.Property(x => x.TryCount).HasDefaultValue(0);

10
modules/background-jobs/src/Volo.Abp.BackgroundJobs.EntityFrameworkCore/Volo/Abp/BackgroundJobs/EntityFrameworkCore/EfCoreBackgroundJobRepository.cs

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Domain.Repositories.EntityFrameworkCore;
using Volo.Abp.EntityFrameworkCore;
@ -22,17 +23,16 @@ public class EfCoreBackgroundJobRepository : EfCoreRepository<IBackgroundJobsDbC
Clock = clock;
}
public virtual async Task<List<BackgroundJobRecord>> GetWaitingListAsync(
int maxResultCount,
CancellationToken cancellationToken = default)
public virtual async Task<List<BackgroundJobRecord>> GetWaitingListAsync([CanBeNull] string applicationName, int maxResultCount, CancellationToken cancellationToken = default)
{
return await (await GetWaitingListQueryAsync(maxResultCount)).ToListAsync(GetCancellationToken(cancellationToken));
return await (await GetWaitingListQueryAsync(applicationName, maxResultCount)).ToListAsync(GetCancellationToken(cancellationToken));
}
protected virtual async Task<IQueryable<BackgroundJobRecord>> GetWaitingListQueryAsync(int maxResultCount)
protected virtual async Task<IQueryable<BackgroundJobRecord>> GetWaitingListQueryAsync([CanBeNull] string applicationName, int maxResultCount)
{
var now = Clock.Now;
return (await GetDbSetAsync())
.Where(t => t.ApplicationName == applicationName)
.Where(t => !t.IsAbandoned && t.NextTryTime <= now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)

10
modules/background-jobs/src/Volo.Abp.BackgroundJobs.MongoDB/Volo/Abp/BackgroundJobs/MongoDB/MongoBackgroundJobRepository.cs

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using JetBrains.Annotations;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using Volo.Abp.Domain.Repositories.MongoDB;
@ -23,17 +24,16 @@ public class MongoBackgroundJobRepository : MongoDbRepository<IBackgroundJobsMon
Clock = clock;
}
public virtual async Task<List<BackgroundJobRecord>> GetWaitingListAsync(
int maxResultCount,
CancellationToken cancellationToken = default)
public virtual async Task<List<BackgroundJobRecord>> GetWaitingListAsync([CanBeNull] string applicationName, int maxResultCount, CancellationToken cancellationToken = default)
{
return await (await GetWaitingListQuery(maxResultCount)).ToListAsync(GetCancellationToken(cancellationToken));
return await (await GetWaitingListQuery(applicationName, maxResultCount, cancellationToken)).ToListAsync(GetCancellationToken(cancellationToken));
}
protected virtual async Task<IQueryable<BackgroundJobRecord>> GetWaitingListQuery(int maxResultCount, CancellationToken cancellationToken = default)
protected virtual async Task<IQueryable<BackgroundJobRecord>> GetWaitingListQuery([CanBeNull] string applicationName, int maxResultCount, CancellationToken cancellationToken = default)
{
var now = Clock.Now;
return (await GetQueryableAsync(cancellationToken))
.Where(t => t.ApplicationName == applicationName)
.Where(t => !t.IsAbandoned && t.NextTryTime <= now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)

9
modules/background-jobs/test/Volo.Abp.BackgroundJobs.TestBase/Volo/Abp/BackgroundJobs/BackgroundJobRepository_Tests.cs

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System.Linq;
using System.Threading.Tasks;
using Shouldly;
using Volo.Abp.Modularity;
using Volo.Abp.Timing;
@ -23,12 +24,16 @@ public abstract class BackgroundJobRepository_Tests<TStartupModule> : Background
[InlineData(5)]
public async Task GetWaitingListAsync(int maxResultCount)
{
var backgroundJobs = await _backgroundJobRepository.GetWaitingListAsync(maxResultCount);
var backgroundJobs = await _backgroundJobRepository.GetWaitingListAsync("App1", maxResultCount);
backgroundJobs.Count.ShouldBeGreaterThan(0);
backgroundJobs.Count.ShouldBeLessThanOrEqualTo(maxResultCount);
backgroundJobs.ForEach(j => j.IsAbandoned.ShouldBeFalse());
backgroundJobs.ForEach(j => j.NextTryTime.ShouldBeLessThanOrEqualTo(_clock.Now.AddSeconds(1))); //1 second tolerance
backgroundJobs.All(j => j.ApplicationName == "App1").ShouldBeTrue();
backgroundJobs.Any(j => j.ApplicationName == "App2").ShouldBeFalse();
backgroundJobs.Any(j => j.ApplicationName == null).ShouldBeFalse();
}
}

3
modules/background-jobs/test/Volo.Abp.BackgroundJobs.TestBase/Volo/Abp/BackgroundJobs/BackgroundJobsTestDataBuilder.cs

@ -26,6 +26,7 @@ public class BackgroundJobsTestDataBuilder : ITransientDependency
await _backgroundJobRepository.InsertAsync(
new BackgroundJobRecord(_testData.JobId1)
{
ApplicationName = "App1",
JobName = "TestJobName",
JobArgs = "{ value: 1 }",
NextTryTime = _clock.Now.Subtract(TimeSpan.FromMinutes(1)),
@ -40,6 +41,7 @@ public class BackgroundJobsTestDataBuilder : ITransientDependency
await _backgroundJobRepository.InsertAsync(
new BackgroundJobRecord(_testData.JobId2)
{
ApplicationName = "App2",
JobName = "TestJobName",
JobArgs = "{ value: 2 }",
NextTryTime = _clock.Now.AddMinutes(42),
@ -54,6 +56,7 @@ public class BackgroundJobsTestDataBuilder : ITransientDependency
await _backgroundJobRepository.InsertAsync(
new BackgroundJobRecord(_testData.JobId3)
{
ApplicationName = "App1",
JobName = "TestJobName",
JobArgs = "{ value: 3 }",
NextTryTime = _clock.Now,

Loading…
Cancel
Save