Browse Source

feat: the job scheduler does not need to centralize processing

pull/549/head
cKey 4 years ago
parent
commit
2f90d90c69
  1. 4
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs
  2. 4
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobInfo.cs
  3. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs
  4. 30
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs
  5. 8
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.TaskManagement/LINGYUN/Abp/BackgroundTasks/TaskManagement/TaskManagementJobPublisher.cs
  6. 6
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/BackgroundJobManager.cs
  7. 8
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/BackgroundWorkerAdapter.cs
  8. 20
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/BackgroundWorkerManager.cs
  9. 2
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs
  10. 3
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application.Contracts/LINGYUN/Abp/TaskManagement/BackgroundJobInfoCreateDto.cs
  11. 14
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs
  12. 1
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/BackgroundJobInfoConsts.cs
  13. 7
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs
  14. 9
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs
  15. 19
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs
  16. 6
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/IBackgroundJobInfoRepository.cs
  17. 16
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/EfCoreBackgroundJobInfoRepository.cs
  18. 3
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/TaskManagementDbContextModelCreatingExtensions.cs
  19. 2
      aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks/LINGYUN.Abp.Webhooks.csproj
  20. 4
      aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks/LINGYUN/Abp/Webhooks/AbpWebhooksModule.cs
  21. 18
      aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks/LINGYUN/Abp/Webhooks/BackgroundWorker/WebhookSenderJob.cs
  22. 10
      aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks/LINGYUN/Abp/Webhooks/WebhookDefinition.cs
  23. 5
      aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks/LINGYUN/Abp/Webhooks/WebhookSenderArgs.cs
  24. 213
      aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/Migrations/20220405072809_Add-Field-NodeName-With-BackgroundJobInfo.Designer.cs
  25. 27
      aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/Migrations/20220405072809_Add-Field-NodeName-With-BackgroundJobInfo.cs
  26. 5
      aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/Migrations/TaskManagementMigrationsDbContextModelSnapshot.cs
  27. 14
      aspnet-core/services/LY.MicroService.WebhooksManagement.HttpApi.Host/LY.MicroService.WebhooksManagement.HttpApi.Host.csproj
  28. 40
      aspnet-core/services/LY.MicroService.WebhooksManagement.HttpApi.Host/WebhooksManagementHttpApiHostModule.Configure.cs
  29. 9
      aspnet-core/services/LY.MicroService.WebhooksManagement.HttpApi.Host/WebhooksManagementHttpApiHostModule.cs
  30. 15
      aspnet-core/services/LY.MicroService.WebhooksManagement.HttpApi.Host/appsettings.Development.json

4
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/AbpBackgroundTasksOptions.cs

@ -70,6 +70,10 @@ public class AbpBackgroundTasksOptions
/// 轮询任务也属于一个后台任务, 需要对每一次轮询加锁,防止重复任务入库 /// 轮询任务也属于一个后台任务, 需要对每一次轮询加锁,防止重复任务入库
/// </remarks> /// </remarks>
public int JobFetchLockTimeOut { get; set; } public int JobFetchLockTimeOut { get; set; }
/// <summary>
/// 指定运行节点
/// </summary>
public string NodeName { get; set; }
public AbpBackgroundTasksOptions() public AbpBackgroundTasksOptions()
{ {
JobFetchEnabled = true; JobFetchEnabled = true;

4
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobInfo.cs

@ -109,6 +109,10 @@ public class JobInfo
/// 0或更小不生效 /// 0或更小不生效
/// </summary> /// </summary>
public int LockTimeOut { get; set; } public int LockTimeOut { get; set; }
/// <summary>
/// 指定运行节点
/// </summary>
public string NodeName { get; set; }
public int GetCanBeTriggered() public int GetCanBeTriggered()
{ {

2
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzJobExecutorProvider.cs

@ -47,6 +47,8 @@ public class QuartzJobExecutorProvider : IQuartzJobExecutorProvider, ISingletonD
var jobBuilder = JobBuilder.Create(jobType) var jobBuilder = JobBuilder.Create(jobType)
.WithIdentity(KeyBuilder.CreateJobKey(job)) .WithIdentity(KeyBuilder.CreateJobKey(job))
.WithDescription(job.Description); .WithDescription(job.Description);
// 多节点任务需要
jobBuilder.UsingJobData(nameof(JobInfo.NodeName), job.NodeName);
// 查询任务需要 // 查询任务需要
jobBuilder.UsingJobData(nameof(JobInfo.Id), job.Id); jobBuilder.UsingJobData(nameof(JobInfo.Id), job.Id);
// 有些场景需要 // 有些场景需要

30
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Quartz/LINGYUN/Abp/BackgroundTasks/Quartz/QuartzTriggerListener.cs

@ -1,8 +1,12 @@
using Quartz; using Microsoft.Extensions.Options;
using Quartz;
using Quartz.Listener; using Quartz.Listener;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace LINGYUN.Abp.BackgroundTasks.Quartz; namespace LINGYUN.Abp.BackgroundTasks.Quartz;
@ -12,12 +16,19 @@ public class QuartzTriggerListener : TriggerListenerSupport, ISingletonDependenc
public override string Name => "QuartzTriggerListener"; public override string Name => "QuartzTriggerListener";
public ILogger<QuartzTriggerListener> Logger { protected get; set; }
protected AbpBackgroundTasksOptions Options { get; }
protected IJobLockProvider JobLockProvider { get; } protected IJobLockProvider JobLockProvider { get; }
public QuartzTriggerListener( public QuartzTriggerListener(
IJobLockProvider jobLockProvider) IJobLockProvider jobLockProvider,
IOptions<AbpBackgroundTasksOptions> options)
{ {
JobLockProvider = jobLockProvider; JobLockProvider = jobLockProvider;
Options = options.Value;
Logger = NullLogger<QuartzTriggerListener>.Instance;
} }
public override async Task<bool> VetoJobExecution( public override async Task<bool> VetoJobExecution(
@ -25,12 +36,25 @@ public class QuartzTriggerListener : TriggerListenerSupport, ISingletonDependenc
IJobExecutionContext context, IJobExecutionContext context,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
if (!Options.NodeName.IsNullOrWhiteSpace())
{
context.MergedJobDataMap.TryGetValue(nameof(JobInfo.NodeName), out var jobNode);
if (!Equals(Options.NodeName, jobNode))
{
Logger.LogDebug("the job does not belong to the current node and will be ignored by the scheduler.");
return true;
}
}
context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId); context.MergedJobDataMap.TryGetValue(nameof(JobInfo.Id), out var jobId);
context.MergedJobDataMap.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime); context.MergedJobDataMap.TryGetValue(nameof(JobInfo.LockTimeOut), out var lockTime);
if (jobId != null && lockTime != null && int.TryParse(lockTime.ToString(), out var time) && time > 0) if (jobId != null && lockTime != null && int.TryParse(lockTime.ToString(), out var time) && time > 0)
{ {
// 传递令牌将清除本次锁, 那并无意义 // 传递令牌将清除本次锁, 那并无意义
return !await JobLockProvider.TryLockAsync(NormalizeKey(context, jobId), time); if (!await JobLockProvider.TryLockAsync(NormalizeKey(context, jobId), time))
{
Logger.LogDebug("The exclusive job is already in use by another scheduler. Ignore this schedule.");
return true;
}
} }
return false; return false;

8
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.TaskManagement/LINGYUN/Abp/BackgroundTasks/TaskManagement/TaskManagementJobPublisher.cs

@ -1,4 +1,5 @@
using LINGYUN.Abp.TaskManagement; using LINGYUN.Abp.TaskManagement;
using Microsoft.Extensions.Options;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Data; using Volo.Abp.Data;
@ -8,12 +9,15 @@ namespace LINGYUN.Abp.BackgroundTasks.TaskManagement;
public class TaskManagementJobPublisher : IJobPublisher, ITransientDependency public class TaskManagementJobPublisher : IJobPublisher, ITransientDependency
{ {
protected AbpBackgroundTasksOptions Options { get; }
protected IBackgroundJobInfoAppService BackgroundJobAppService { get; } protected IBackgroundJobInfoAppService BackgroundJobAppService { get; }
public TaskManagementJobPublisher( public TaskManagementJobPublisher(
IBackgroundJobInfoAppService backgroundJobAppService) IBackgroundJobInfoAppService backgroundJobAppService,
IOptions<AbpBackgroundTasksOptions> options)
{ {
BackgroundJobAppService = backgroundJobAppService; BackgroundJobAppService = backgroundJobAppService;
Options = options.Value;
} }
public async virtual Task<bool> PublishAsync(JobInfo job, CancellationToken cancellationToken = default) public async virtual Task<bool> PublishAsync(JobInfo job, CancellationToken cancellationToken = default)
@ -34,7 +38,9 @@ public class TaskManagementJobPublisher : IJobPublisher, ITransientDependency
LockTimeOut = job.LockTimeOut, LockTimeOut = job.LockTimeOut,
IsEnabled = true, IsEnabled = true,
Name = job.Name, Name = job.Name,
Source = job.Source,
Priority = job.Priority, Priority = job.Priority,
NodeName = Options.NodeName,
}; };
await BackgroundJobAppService.CreateAsync(input); await BackgroundJobAppService.CreateAsync(input);

6
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/BackgroundJobManager.cs

@ -20,6 +20,7 @@ public class BackgroundJobManager : IBackgroundJobManager, ITransientDependency
protected ICurrentTenant CurrentTenant { get; } protected ICurrentTenant CurrentTenant { get; }
protected IGuidGenerator GuidGenerator { get; } protected IGuidGenerator GuidGenerator { get; }
protected IJsonSerializer JsonSerializer { get; } protected IJsonSerializer JsonSerializer { get; }
protected AbpBackgroundTasksOptions TasksOptions { get; }
protected AbpBackgroundJobOptions Options { get; } protected AbpBackgroundJobOptions Options { get; }
public BackgroundJobManager( public BackgroundJobManager(
IClock clock, IClock clock,
@ -28,7 +29,8 @@ public class BackgroundJobManager : IBackgroundJobManager, ITransientDependency
ICurrentTenant currentTenant, ICurrentTenant currentTenant,
IGuidGenerator guidGenerator, IGuidGenerator guidGenerator,
IJsonSerializer jsonSerializer, IJsonSerializer jsonSerializer,
IOptions<AbpBackgroundJobOptions> options) IOptions<AbpBackgroundJobOptions> options,
IOptions<AbpBackgroundTasksOptions> taskOptions)
{ {
Clock = clock; Clock = clock;
JobStore = jobStore; JobStore = jobStore;
@ -37,6 +39,7 @@ public class BackgroundJobManager : IBackgroundJobManager, ITransientDependency
GuidGenerator = guidGenerator; GuidGenerator = guidGenerator;
JsonSerializer = jsonSerializer; JsonSerializer = jsonSerializer;
Options = options.Value; Options = options.Value;
TasksOptions = taskOptions.Value;
} }
public virtual async Task<string> EnqueueAsync<TArgs>( public virtual async Task<string> EnqueueAsync<TArgs>(
@ -74,6 +77,7 @@ public class BackgroundJobManager : IBackgroundJobManager, ITransientDependency
CreationTime = Clock.Now, CreationTime = Clock.Now,
// 确保不会被轮询入队 // 确保不会被轮询入队
Status = JobStatus.None, Status = JobStatus.None,
NodeName = TasksOptions.NodeName,
Type = typeof(BackgroundJobAdapter<TArgs>).AssemblyQualifiedName, Type = typeof(BackgroundJobAdapter<TArgs>).AssemblyQualifiedName,
}; };

8
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/BackgroundWorkerAdapter.cs

@ -4,18 +4,13 @@ using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.BackgroundWorkers; using Volo.Abp.BackgroundWorkers;
using Volo.Abp.DynamicProxy; using Volo.Abp.DynamicProxy;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading; using Volo.Abp.Threading;
using Volo.Abp.Timing;
namespace LINGYUN.Abp.BackgroundTasks; namespace LINGYUN.Abp.BackgroundTasks;
public class BackgroundWorkerAdapter<TWorker> : BackgroundWorkerBase, IBackgroundWorkerRunnable public class BackgroundWorkerAdapter<TWorker> : BackgroundWorkerBase, IBackgroundWorkerRunnable
where TWorker : IBackgroundWorker where TWorker : IBackgroundWorker
{ {
protected IClock Clock => LazyServiceProvider.LazyGetRequiredService<IClock>();
protected ICurrentTenant CurrentTenant => LazyServiceProvider.LazyGetRequiredService<ICurrentTenant>();
private readonly MethodInfo _doWorkAsyncMethod; private readonly MethodInfo _doWorkAsyncMethod;
private readonly MethodInfo _doWorkMethod; private readonly MethodInfo _doWorkMethod;
@ -70,12 +65,10 @@ public class BackgroundWorkerAdapter<TWorker> : BackgroundWorkerBase, IBackgroun
return new JobInfo return new JobInfo
{ {
Id = workerType.FullName, Id = workerType.FullName,
TenantId = CurrentTenant.Id,
Name = workerType.FullName, Name = workerType.FullName,
Group = "BackgroundWorkers", Group = "BackgroundWorkers",
Priority = JobPriority.Normal, Priority = JobPriority.Normal,
Source = JobSource.System, Source = JobSource.System,
BeginTime = Clock.Now,
Args = jobArgs, Args = jobArgs,
Description = "From the framework background workers", Description = "From the framework background workers",
JobType = JobType.Persistent, JobType = JobType.Persistent,
@ -83,7 +76,6 @@ public class BackgroundWorkerAdapter<TWorker> : BackgroundWorkerBase, IBackgroun
MaxCount = 0, MaxCount = 0,
// TODO: 可配置 // TODO: 可配置
MaxTryCount = 10, MaxTryCount = 10,
CreationTime = Clock.Now,
// 确保不会被轮询入队 // 确保不会被轮询入队
Status = JobStatus.None, Status = JobStatus.None,
Type = typeof(BackgroundWorkerAdapter<TWorker>).AssemblyQualifiedName, Type = typeof(BackgroundWorkerAdapter<TWorker>).AssemblyQualifiedName,

20
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/BackgroundWorkerManager.cs

@ -1,24 +1,36 @@
using System; using Microsoft.Extensions.Options;
using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.BackgroundWorkers; using Volo.Abp.BackgroundWorkers;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy; using Volo.Abp.DynamicProxy;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Timing;
namespace LINGYUN.Abp.BackgroundTasks; namespace LINGYUN.Abp.BackgroundTasks;
[Dependency(ReplaceServices = true)] [Dependency(ReplaceServices = true)]
public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency
{ {
protected IClock Clock { get; }
protected IJobStore JobStore { get; } protected IJobStore JobStore { get; }
protected IJobPublisher JobPublisher { get; } protected IJobPublisher JobPublisher { get; }
protected ICurrentTenant CurrentTenant { get; }
protected AbpBackgroundTasksOptions Options { get; }
public BackgroundWorkerManager( public BackgroundWorkerManager(
IClock clock,
IJobStore jobStore, IJobStore jobStore,
IJobPublisher jobPublisher) IJobPublisher jobPublisher,
ICurrentTenant currentTenant,
IOptions<AbpBackgroundTasksOptions> options)
{ {
Clock = clock;
JobStore = jobStore; JobStore = jobStore;
JobPublisher = jobPublisher; JobPublisher = jobPublisher;
CurrentTenant = currentTenant;
Options = options.Value;
} }
public async Task AddAsync(IBackgroundWorker worker) public async Task AddAsync(IBackgroundWorker worker)
@ -34,6 +46,10 @@ public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDepen
return; return;
} }
jobInfo.NodeName = Options.NodeName;
jobInfo.BeginTime = Clock.Now;
jobInfo.CreationTime = Clock.Now;
jobInfo.TenantId = CurrentTenant.Id;
// 存储状态 // 存储状态
await JobStore.StoreAsync(jobInfo); await JobStore.StoreAsync(jobInfo);

2
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/DefaultBackgroundWorker.cs

@ -64,6 +64,7 @@ internal class DefaultBackgroundWorker : BackgroundService
Priority = JobPriority.High, Priority = JobPriority.High,
Source = JobSource.System, Source = JobSource.System,
LockTimeOut = _options.JobFetchLockTimeOut, LockTimeOut = _options.JobFetchLockTimeOut,
NodeName = _options.NodeName,
Type = typeof(BackgroundPollingJob).AssemblyQualifiedName, Type = typeof(BackgroundPollingJob).AssemblyQualifiedName,
}; };
} }
@ -84,6 +85,7 @@ internal class DefaultBackgroundWorker : BackgroundService
JobType = JobType.Period, JobType = JobType.Period,
Priority = JobPriority.High, Priority = JobPriority.High,
Source = JobSource.System, Source = JobSource.System,
NodeName = _options.NodeName,
Type = typeof(BackgroundCleaningJob).AssemblyQualifiedName, Type = typeof(BackgroundCleaningJob).AssemblyQualifiedName,
}; };
} }

3
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application.Contracts/LINGYUN/Abp/TaskManagement/BackgroundJobInfoCreateDto.cs

@ -25,6 +25,9 @@ public class BackgroundJobInfoCreateDto : BackgroundJobInfoCreateOrUpdateDto
[Required] [Required]
[DynamicStringLength(typeof(BackgroundJobInfoConsts), nameof(BackgroundJobInfoConsts.MaxTypeLength))] [DynamicStringLength(typeof(BackgroundJobInfoConsts), nameof(BackgroundJobInfoConsts.MaxTypeLength))]
public string Type { get; set; } public string Type { get; set; }
[DynamicStringLength(typeof(BackgroundJobInfoConsts), nameof(BackgroundJobInfoConsts.MaxNodeNameLength))]
public string NodeName { get; set; }
/// <summary> /// <summary>
/// 开始时间 /// 开始时间
/// </summary> /// </summary>

14
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Application/LINGYUN/Abp/TaskManagement/BackgroundJobInfoAppService.cs

@ -1,6 +1,7 @@
using LINGYUN.Abp.BackgroundTasks; using LINGYUN.Abp.BackgroundTasks;
using LINGYUN.Abp.TaskManagement.Permissions; using LINGYUN.Abp.TaskManagement.Permissions;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Options;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -13,15 +14,18 @@ namespace LINGYUN.Abp.TaskManagement;
[Authorize(TaskManagementPermissions.BackgroundJobs.Default)] [Authorize(TaskManagementPermissions.BackgroundJobs.Default)]
public class BackgroundJobInfoAppService : TaskManagementApplicationService, IBackgroundJobInfoAppService public class BackgroundJobInfoAppService : TaskManagementApplicationService, IBackgroundJobInfoAppService
{ {
protected AbpBackgroundTasksOptions Options { get; }
protected BackgroundJobManager BackgroundJobManager { get; } protected BackgroundJobManager BackgroundJobManager { get; }
protected IBackgroundJobInfoRepository BackgroundJobInfoRepository { get; } protected IBackgroundJobInfoRepository BackgroundJobInfoRepository { get; }
public BackgroundJobInfoAppService( public BackgroundJobInfoAppService(
BackgroundJobManager backgroundJobManager, BackgroundJobManager backgroundJobManager,
IBackgroundJobInfoRepository backgroundJobInfoRepository) IBackgroundJobInfoRepository backgroundJobInfoRepository,
IOptions<AbpBackgroundTasksOptions> options)
{ {
BackgroundJobManager = backgroundJobManager; BackgroundJobManager = backgroundJobManager;
BackgroundJobInfoRepository = backgroundJobInfoRepository; BackgroundJobInfoRepository = backgroundJobInfoRepository;
Options = options.Value;
} }
[Authorize(TaskManagementPermissions.BackgroundJobs.Create)] [Authorize(TaskManagementPermissions.BackgroundJobs.Create)]
@ -47,16 +51,14 @@ public class BackgroundJobInfoAppService : TaskManagementApplicationService, IBa
input.Source, input.Source,
input.MaxCount, input.MaxCount,
input.MaxTryCount, input.MaxTryCount,
input.NodeName ?? Options.NodeName,
CurrentTenant.Id); CurrentTenant.Id);
UpdateByInput(backgroundJobInfo, input); UpdateByInput(backgroundJobInfo, input);
await BackgroundJobInfoRepository.InsertAsync(backgroundJobInfo, autoSave: true); await BackgroundJobManager.CreateAsync(backgroundJobInfo);
if (backgroundJobInfo.IsEnabled && backgroundJobInfo.JobType == JobType.Period) await CurrentUnitOfWork.SaveChangesAsync();
{
await BackgroundJobManager.QueueAsync(backgroundJobInfo);
}
return ObjectMapper.Map<BackgroundJobInfo, BackgroundJobInfoDto>(backgroundJobInfo); return ObjectMapper.Map<BackgroundJobInfo, BackgroundJobInfoDto>(backgroundJobInfo);
} }

1
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/BackgroundJobInfoConsts.cs

@ -5,6 +5,7 @@ public static class BackgroundJobInfoConsts
public static int MaxCronLength { get; set; } = 50; public static int MaxCronLength { get; set; } = 50;
public static int MaxNameLength { get; set; } = 100; public static int MaxNameLength { get; set; } = 100;
public static int MaxGroupLength { get; set; } = 50; public static int MaxGroupLength { get; set; } = 50;
public static int MaxNodeNameLength { get; set; } = 128;
public static int MaxTypeLength { get; set; } = 1000; public static int MaxTypeLength { get; set; } = 1000;
public static int MaxDescriptionLength { get; set; } = 255; public static int MaxDescriptionLength { get; set; } = 255;
public static int MaxResultLength { get; set; } = 1000; public static int MaxResultLength { get; set; } = 1000;

7
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobInfo.cs

@ -107,7 +107,10 @@ public class BackgroundJobInfo : AuditedAggregateRoot<string>, IMultiTenant
/// 连续失败且不会再次执行 /// 连续失败且不会再次执行
/// </summary> /// </summary>
public virtual bool IsAbandoned { get; set; } public virtual bool IsAbandoned { get; set; }
/// <summary>
/// 指定作业运行节点
/// </summary>
public virtual string NodeName { get; protected set; }
protected BackgroundJobInfo() { } protected BackgroundJobInfo() { }
public BackgroundJobInfo( public BackgroundJobInfo(
@ -122,6 +125,7 @@ public class BackgroundJobInfo : AuditedAggregateRoot<string>, IMultiTenant
JobSource source = JobSource.None, JobSource source = JobSource.None,
int maxCount = 0, int maxCount = 0,
int maxTryCount = 50, int maxTryCount = 50,
string nodeName = null,
Guid? tenantId = null) : base(id) Guid? tenantId = null) : base(id)
{ {
Name = Check.NotNullOrWhiteSpace(name, nameof(name), BackgroundJobInfoConsts.MaxNameLength); Name = Check.NotNullOrWhiteSpace(name, nameof(name), BackgroundJobInfoConsts.MaxNameLength);
@ -134,6 +138,7 @@ public class BackgroundJobInfo : AuditedAggregateRoot<string>, IMultiTenant
MaxCount = maxCount; MaxCount = maxCount;
MaxTryCount = maxTryCount; MaxTryCount = maxTryCount;
NodeName = Check.Length(nodeName, nameof(nodeName), BackgroundJobInfoConsts.MaxNodeNameLength);
TenantId = tenantId; TenantId = tenantId;
Status = JobStatus.Running; Status = JobStatus.Running;

9
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs

@ -1,7 +1,6 @@
using LINGYUN.Abp.BackgroundTasks; using LINGYUN.Abp.BackgroundTasks;
using System.Threading.Tasks;
using System.Collections.Generic; using System.Collections.Generic;
using Volo.Abp; using System.Threading.Tasks;
using Volo.Abp.Domain.Services; using Volo.Abp.Domain.Services;
using Volo.Abp.ObjectMapping; using Volo.Abp.ObjectMapping;
using Volo.Abp.Uow; using Volo.Abp.Uow;
@ -34,12 +33,6 @@ public class BackgroundJobManager : DomainService
if (jobInfo.IsEnabled && jobInfo.JobType == JobType.Period) if (jobInfo.IsEnabled && jobInfo.JobType == JobType.Period)
{ {
var job = ObjectMapper.Map<BackgroundJobInfo, JobInfo>(jobInfo); var job = ObjectMapper.Map<BackgroundJobInfo, JobInfo>(jobInfo);
if (await JobScheduler.ExistsAsync(job))
{
throw new BusinessException(TaskManagementErrorCodes.JobNameAlreadyExists)
.WithData("Group", job.Group)
.WithData("Name", job.Name);
}
UnitOfWorkManager.Current.OnCompleted(async () => UnitOfWorkManager.Current.OnCompleted(async () =>
{ {
await JobScheduler.QueueAsync(job); await JobScheduler.QueueAsync(job);

19
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs

@ -1,9 +1,9 @@
using LINGYUN.Abp.BackgroundTasks; using LINGYUN.Abp.BackgroundTasks;
using Microsoft.Extensions.Options;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Data;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
using Volo.Abp.ObjectMapping; using Volo.Abp.ObjectMapping;
@ -19,28 +19,32 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
protected IBackgroundJobInfoRepository JobInfoRepository { get; } protected IBackgroundJobInfoRepository JobInfoRepository { get; }
protected IBackgroundJobLogRepository JobLogRepository { get; } protected IBackgroundJobLogRepository JobLogRepository { get; }
protected AbpBackgroundTasksOptions Options { get; }
public BackgroundJobStore( public BackgroundJobStore(
IObjectMapper objectMapper, IObjectMapper objectMapper,
ICurrentTenant currentTenant, ICurrentTenant currentTenant,
IBackgroundJobInfoRepository jobInfoRepository, IBackgroundJobInfoRepository jobInfoRepository,
IBackgroundJobLogRepository jobLogRepository) IBackgroundJobLogRepository jobLogRepository,
IOptions<AbpBackgroundTasksOptions> options)
{ {
ObjectMapper = objectMapper; ObjectMapper = objectMapper;
CurrentTenant = currentTenant; CurrentTenant = currentTenant;
JobInfoRepository = jobInfoRepository; JobInfoRepository = jobInfoRepository;
JobLogRepository = jobLogRepository; JobLogRepository = jobLogRepository;
Options = options.Value;
} }
public async virtual Task<List<JobInfo>> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default) public async virtual Task<List<JobInfo>> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default)
{ {
var jobInfos = await JobInfoRepository.GetAllPeriodTasksAsync(cancellationToken); var jobInfos = await JobInfoRepository.GetAllPeriodTasksAsync(Options.NodeName, cancellationToken);
return ObjectMapper.Map<List<BackgroundJobInfo>, List<JobInfo>>(jobInfos); return ObjectMapper.Map<List<BackgroundJobInfo>, List<JobInfo>>(jobInfos);
} }
public async virtual Task<List<JobInfo>> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default) public async virtual Task<List<JobInfo>> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default)
{ {
var jobInfos = await JobInfoRepository.GetWaitingListAsync(maxResultCount, cancellationToken); var jobInfos = await JobInfoRepository.GetWaitingListAsync(Options.NodeName, maxResultCount, cancellationToken);
return ObjectMapper.Map<List<BackgroundJobInfo>, List<JobInfo>>(jobInfos); return ObjectMapper.Map<List<BackgroundJobInfo>, List<JobInfo>>(jobInfos);
} }
@ -142,9 +146,10 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
var jobs = await JobInfoRepository.GetExpiredJobsAsync( var jobs = await JobInfoRepository.GetExpiredJobsAsync(
maxResultCount, Options.NodeName,
jobExpiratime, maxResultCount,
cancellationToken); jobExpiratime,
cancellationToken);
await JobInfoRepository.DeleteManyAsync(jobs, cancellationToken: cancellationToken); await JobInfoRepository.DeleteManyAsync(jobs, cancellationToken: cancellationToken);
} }

6
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/IBackgroundJobInfoRepository.cs

@ -27,11 +27,13 @@ public interface IBackgroundJobInfoRepository : IRepository<BackgroundJobInfo, s
/// <summary> /// <summary>
/// 获取过期任务列表 /// 获取过期任务列表
/// </summary> /// </summary>
/// <param name="nodeName"></param>
/// <param name="maxResultCount"></param> /// <param name="maxResultCount"></param>
/// <param name="jobExpiratime"></param> /// <param name="jobExpiratime"></param>
/// <param name="cancellationToken"></param> /// <param name="cancellationToken"></param>
/// <returns></returns> /// <returns></returns>
Task<List<BackgroundJobInfo>> GetExpiredJobsAsync( Task<List<BackgroundJobInfo>> GetExpiredJobsAsync(
string nodeName,
int maxResultCount, int maxResultCount,
TimeSpan jobExpiratime, TimeSpan jobExpiratime,
CancellationToken cancellationToken = default); CancellationToken cancellationToken = default);
@ -39,16 +41,20 @@ public interface IBackgroundJobInfoRepository : IRepository<BackgroundJobInfo, s
/// 获取所有周期性任务 /// 获取所有周期性任务
/// 指定了Cron表达式的任务需要作为持续性任务交给任务引擎 /// 指定了Cron表达式的任务需要作为持续性任务交给任务引擎
/// </summary> /// </summary>
/// <param name="nodeName"></param>
/// <returns></returns> /// <returns></returns>
Task<List<BackgroundJobInfo>> GetAllPeriodTasksAsync( Task<List<BackgroundJobInfo>> GetAllPeriodTasksAsync(
string nodeName,
CancellationToken cancellationToken = default); CancellationToken cancellationToken = default);
/// <summary> /// <summary>
/// 获取等待入队的任务列表 /// 获取等待入队的任务列表
/// </summary> /// </summary>
/// <param name="nodeName"></param>
/// <param name="maxResultCount"></param> /// <param name="maxResultCount"></param>
/// <param name="cancellationToken"></param> /// <param name="cancellationToken"></param>
/// <returns></returns> /// <returns></returns>
Task<List<BackgroundJobInfo>> GetWaitingListAsync( Task<List<BackgroundJobInfo>> GetWaitingListAsync(
string nodeName,
int maxResultCount, int maxResultCount,
CancellationToken cancellationToken = default); CancellationToken cancellationToken = default);
/// <summary> /// <summary>

16
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/EfCoreBackgroundJobInfoRepository.cs

@ -69,12 +69,14 @@ public class EfCoreBackgroundJobInfoRepository :
Result = x.Result, Result = x.Result,
TriggerCount = x.TriggerCount, TriggerCount = x.TriggerCount,
TryCount = x.TryCount, TryCount = x.TryCount,
Type = x.Type Type = x.Type,
NodeName = x.NodeName,
}) })
.FirstOrDefaultAsync(GetCancellationToken(cancellationToken)); .FirstOrDefaultAsync(GetCancellationToken(cancellationToken));
} }
public virtual async Task<List<BackgroundJobInfo>> GetExpiredJobsAsync( public virtual async Task<List<BackgroundJobInfo>> GetExpiredJobsAsync(
string nodeName,
int maxResultCount, int maxResultCount,
TimeSpan jobExpiratime, TimeSpan jobExpiratime,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
@ -82,6 +84,7 @@ public class EfCoreBackgroundJobInfoRepository :
var expiratime = Clock.Now - jobExpiratime; var expiratime = Clock.Now - jobExpiratime;
return await (await GetDbSetAsync()) return await (await GetDbSetAsync())
.Where(x => x.NodeName == nodeName)
.Where(x => x.Status == JobStatus.Completed && .Where(x => x.Status == JobStatus.Completed &&
DateTime.Compare(x.LastRunTime.Value, expiratime) <= 0) DateTime.Compare(x.LastRunTime.Value, expiratime) <= 0)
.OrderBy(x => x.CreationTime) .OrderBy(x => x.CreationTime)
@ -89,11 +92,14 @@ public class EfCoreBackgroundJobInfoRepository :
.ToListAsync(GetCancellationToken(cancellationToken)); .ToListAsync(GetCancellationToken(cancellationToken));
} }
public virtual async Task<List<BackgroundJobInfo>> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default) public virtual async Task<List<BackgroundJobInfo>> GetAllPeriodTasksAsync(
string nodeName,
CancellationToken cancellationToken = default)
{ {
var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry }; var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry };
return await (await GetDbSetAsync()) return await (await GetDbSetAsync())
.Where(x => x.NodeName == nodeName)
.Where(x => x.IsEnabled && !x.IsAbandoned) .Where(x => x.IsEnabled && !x.IsAbandoned)
.Where(x => x.JobType == JobType.Period && status.Contains(x.Status)) .Where(x => x.JobType == JobType.Period && status.Contains(x.Status))
.Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount)) .Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount))
@ -116,12 +122,16 @@ public class EfCoreBackgroundJobInfoRepository :
.ToListAsync(GetCancellationToken(cancellationToken)); .ToListAsync(GetCancellationToken(cancellationToken));
} }
public virtual async Task<List<BackgroundJobInfo>> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default) public virtual async Task<List<BackgroundJobInfo>> GetWaitingListAsync(
string nodeName,
int maxResultCount,
CancellationToken cancellationToken = default)
{ {
var now = Clock.Now; var now = Clock.Now;
var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry }; var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry };
return await (await GetDbSetAsync()) return await (await GetDbSetAsync())
.Where(x => x.NodeName == nodeName)
.Where(x => x.IsEnabled && !x.IsAbandoned) .Where(x => x.IsEnabled && !x.IsAbandoned)
.Where(x => x.JobType != JobType.Period && status.Contains(x.Status)) .Where(x => x.JobType != JobType.Period && status.Contains(x.Status))
.Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount)) .Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount))

3
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.EntityFrameworkCore/LINGYUN/Abp/TaskManagement/EntityFrameworkCore/TaskManagementDbContextModelCreatingExtensions.cs

@ -47,6 +47,9 @@ public static class TaskManagementDbContextModelCreatingExtensions
b.Property(p => p.Result) b.Property(p => p.Result)
.HasColumnName(nameof(BackgroundJobInfo.Result)) .HasColumnName(nameof(BackgroundJobInfo.Result))
.HasMaxLength(BackgroundJobInfoConsts.MaxResultLength); .HasMaxLength(BackgroundJobInfoConsts.MaxResultLength);
b.Property(p => p.NodeName)
.HasColumnName(nameof(BackgroundJobInfo.NodeName))
.HasMaxLength(BackgroundJobInfoConsts.MaxNodeNameLength);
b.Property(p => p.Args) b.Property(p => p.Args)
.HasColumnName(nameof(BackgroundJobInfo.Args)) .HasColumnName(nameof(BackgroundJobInfo.Args))
.HasConversion(new ExtraPropertiesValueConverter(b.Metadata.ClrType)) .HasConversion(new ExtraPropertiesValueConverter(b.Metadata.ClrType))

2
aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks/LINGYUN.Abp.Webhooks.csproj

@ -9,7 +9,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Volo.Abp.BackgroundJobs" Version="$(VoloAbpPackageVersion)" /> <PackageReference Include="Volo.Abp.BackgroundJobs.Abstractions" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Features" Version="$(VoloAbpPackageVersion)" /> <PackageReference Include="Volo.Abp.Features" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Guids" Version="$(VoloAbpPackageVersion)" /> <PackageReference Include="Volo.Abp.Guids" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Http.Client" Version="$(VoloAbpPackageVersion)" /> <PackageReference Include="Volo.Abp.Http.Client" Version="$(VoloAbpPackageVersion)" />

4
aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks/LINGYUN/Abp/Webhooks/AbpWebhooksModule.cs

@ -9,9 +9,7 @@ using Volo.Abp.Modularity;
namespace LINGYUN.Abp.Webhooks; namespace LINGYUN.Abp.Webhooks;
//[DependsOn(typeof(AbpBackgroundJobsAbstractionsModule))] [DependsOn(typeof(AbpBackgroundJobsAbstractionsModule))]
// 防止未引用实现无法发布到后台作业
[DependsOn(typeof(AbpBackgroundJobsModule))]
[DependsOn(typeof(AbpFeaturesModule))] [DependsOn(typeof(AbpFeaturesModule))]
[DependsOn(typeof(AbpGuidsModule))] [DependsOn(typeof(AbpGuidsModule))]
[DependsOn(typeof(AbpHttpClientModule))] [DependsOn(typeof(AbpHttpClientModule))]

18
aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks/LINGYUN/Abp/Webhooks/BackgroundWorker/WebhookSenderJob.cs

@ -11,7 +11,6 @@ namespace LINGYUN.Abp.Webhooks.BackgroundWorker
public class WebhookSenderJob : AsyncBackgroundJob<WebhookSenderArgs>, ITransientDependency public class WebhookSenderJob : AsyncBackgroundJob<WebhookSenderArgs>, ITransientDependency
{ {
private readonly IUnitOfWorkManager _unitOfWorkManager; private readonly IUnitOfWorkManager _unitOfWorkManager;
private readonly IWebhookDefinitionManager _webhookDefinitionManager;
private readonly IWebhookSubscriptionManager _webhookSubscriptionManager; private readonly IWebhookSubscriptionManager _webhookSubscriptionManager;
private readonly IWebhookSendAttemptStore _webhookSendAttemptStore; private readonly IWebhookSendAttemptStore _webhookSendAttemptStore;
private readonly IWebhookSender _webhookSender; private readonly IWebhookSender _webhookSender;
@ -20,14 +19,12 @@ namespace LINGYUN.Abp.Webhooks.BackgroundWorker
public WebhookSenderJob( public WebhookSenderJob(
IUnitOfWorkManager unitOfWorkManager, IUnitOfWorkManager unitOfWorkManager,
IWebhookDefinitionManager webhookDefinitionManager,
IWebhookSubscriptionManager webhookSubscriptionManager, IWebhookSubscriptionManager webhookSubscriptionManager,
IWebhookSendAttemptStore webhookSendAttemptStore, IWebhookSendAttemptStore webhookSendAttemptStore,
IWebhookSender webhookSender, IWebhookSender webhookSender,
IOptions<AbpWebhooksOptions> options) IOptions<AbpWebhooksOptions> options)
{ {
_unitOfWorkManager = unitOfWorkManager; _unitOfWorkManager = unitOfWorkManager;
_webhookDefinitionManager = webhookDefinitionManager;
_webhookSubscriptionManager = webhookSubscriptionManager; _webhookSubscriptionManager = webhookSubscriptionManager;
_webhookSendAttemptStore = webhookSendAttemptStore; _webhookSendAttemptStore = webhookSendAttemptStore;
_webhookSender = webhookSender; _webhookSender = webhookSender;
@ -36,13 +33,11 @@ namespace LINGYUN.Abp.Webhooks.BackgroundWorker
public override async Task ExecuteAsync(WebhookSenderArgs args) public override async Task ExecuteAsync(WebhookSenderArgs args)
{ {
var webhookDefinition = _webhookDefinitionManager.Get(args.WebhookName); if (args.TryOnce)
if (webhookDefinition.TryOnce)
{ {
try try
{ {
await SendWebhook(args, webhookDefinition); await SendWebhook(args);
} }
catch (Exception e) catch (Exception e)
{ {
@ -52,11 +47,11 @@ namespace LINGYUN.Abp.Webhooks.BackgroundWorker
} }
else else
{ {
await SendWebhook(args, webhookDefinition); await SendWebhook(args);
} }
} }
private async Task SendWebhook(WebhookSenderArgs args, WebhookDefinition webhookDefinition) private async Task SendWebhook(WebhookSenderArgs args)
{ {
if (args.WebhookEventId == default) if (args.WebhookEventId == default)
{ {
@ -68,7 +63,7 @@ namespace LINGYUN.Abp.Webhooks.BackgroundWorker
return; return;
} }
if (!webhookDefinition.TryOnce) if (!args.TryOnce)
{ {
var sendAttemptCount = await _webhookSendAttemptStore.GetSendAttemptCountAsync( var sendAttemptCount = await _webhookSendAttemptStore.GetSendAttemptCountAsync(
args.TenantId, args.TenantId,
@ -76,8 +71,7 @@ namespace LINGYUN.Abp.Webhooks.BackgroundWorker
args.WebhookSubscriptionId args.WebhookSubscriptionId
); );
if ((webhookDefinition.MaxSendAttemptCount > 0 && sendAttemptCount > webhookDefinition.MaxSendAttemptCount) || if (sendAttemptCount > _options.MaxSendAttemptCount)
sendAttemptCount > _options.MaxSendAttemptCount)
{ {
return; return;
} }

10
aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks/LINGYUN/Abp/Webhooks/WebhookDefinition.cs

@ -11,16 +11,6 @@ namespace LINGYUN.Abp.Webhooks
/// </summary> /// </summary>
public string Name { get; } public string Name { get; }
/// <summary>
/// Tries to send webhook only one time without checking to send attempt count
/// </summary>
public bool TryOnce { get; set; }
/// <summary>
/// Defined maximum number of sending times
/// </summary>
public int MaxSendAttemptCount { get; set; }
/// <summary> /// <summary>
/// Display name of the webhook. /// Display name of the webhook.
/// Optional. /// Optional.

5
aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks/LINGYUN/Abp/Webhooks/WebhookSenderArgs.cs

@ -46,6 +46,11 @@ namespace LINGYUN.Abp.Webhooks
/// </summary> /// </summary>
public IDictionary<string, string> Headers { get; set; } public IDictionary<string, string> Headers { get; set; }
/// <summary>
/// Tries to send webhook only one time without checking to send attempt count
/// </summary>
public bool TryOnce { get; set; }
/// <summary> /// <summary>
/// True: It sends the exact same data as the parameter to clients. /// True: It sends the exact same data as the parameter to clients.
/// <para> /// <para>

213
aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/Migrations/20220405072809_Add-Field-NodeName-With-BackgroundJobInfo.Designer.cs

@ -0,0 +1,213 @@
// <auto-generated />
using System;
using LY.MicroService.TaskManagement.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Volo.Abp.EntityFrameworkCore;
#nullable disable
namespace LY.MicroService.TaskManagement.Migrations
{
[DbContext(typeof(TaskManagementMigrationsDbContext))]
[Migration("20220405072809_Add-Field-NodeName-With-BackgroundJobInfo")]
partial class AddFieldNodeNameWithBackgroundJobInfo
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("_Abp_DatabaseProvider", EfCoreDatabaseProvider.MySql)
.HasAnnotation("ProductVersion", "6.0.3")
.HasAnnotation("Relational:MaxIdentifierLength", 64);
modelBuilder.Entity("LINGYUN.Abp.TaskManagement.BackgroundJobInfo", b =>
{
b.Property<string>("Id")
.HasColumnType("varchar(255)");
b.Property<string>("Args")
.HasColumnType("longtext")
.HasColumnName("Args");
b.Property<DateTime>("BeginTime")
.HasColumnType("datetime(6)");
b.Property<string>("ConcurrencyStamp")
.IsConcurrencyToken()
.HasMaxLength(40)
.HasColumnType("varchar(40)")
.HasColumnName("ConcurrencyStamp");
b.Property<DateTime>("CreationTime")
.HasColumnType("datetime(6)")
.HasColumnName("CreationTime");
b.Property<Guid?>("CreatorId")
.HasColumnType("char(36)")
.HasColumnName("CreatorId");
b.Property<string>("Cron")
.HasMaxLength(50)
.HasColumnType("varchar(50)")
.HasColumnName("Cron");
b.Property<string>("Description")
.HasMaxLength(255)
.HasColumnType("varchar(255)")
.HasColumnName("Description");
b.Property<DateTime?>("EndTime")
.HasColumnType("datetime(6)");
b.Property<string>("ExtraProperties")
.HasColumnType("longtext")
.HasColumnName("ExtraProperties");
b.Property<string>("Group")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("varchar(50)")
.HasColumnName("Group");
b.Property<int>("Interval")
.HasColumnType("int");
b.Property<bool>("IsAbandoned")
.HasColumnType("tinyint(1)");
b.Property<bool>("IsEnabled")
.HasColumnType("tinyint(1)");
b.Property<int>("JobType")
.HasColumnType("int");
b.Property<DateTime?>("LastModificationTime")
.HasColumnType("datetime(6)")
.HasColumnName("LastModificationTime");
b.Property<Guid?>("LastModifierId")
.HasColumnType("char(36)")
.HasColumnName("LastModifierId");
b.Property<DateTime?>("LastRunTime")
.HasColumnType("datetime(6)");
b.Property<int>("LockTimeOut")
.HasColumnType("int");
b.Property<int>("MaxCount")
.HasColumnType("int");
b.Property<int>("MaxTryCount")
.HasColumnType("int");
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("varchar(100)")
.HasColumnName("Name");
b.Property<DateTime?>("NextRunTime")
.HasColumnType("datetime(6)");
b.Property<string>("NodeName")
.HasMaxLength(128)
.HasColumnType("varchar(128)")
.HasColumnName("NodeName");
b.Property<int>("Priority")
.HasColumnType("int");
b.Property<string>("Result")
.HasMaxLength(1000)
.HasColumnType("varchar(1000)")
.HasColumnName("Result");
b.Property<int>("Source")
.ValueGeneratedOnAdd()
.HasColumnType("int")
.HasDefaultValue(-1)
.HasColumnName("Source");
b.Property<int>("Status")
.HasColumnType("int");
b.Property<Guid?>("TenantId")
.HasColumnType("char(36)")
.HasColumnName("TenantId");
b.Property<int>("TriggerCount")
.HasColumnType("int");
b.Property<int>("TryCount")
.HasColumnType("int");
b.Property<string>("Type")
.IsRequired()
.HasMaxLength(1000)
.HasColumnType("varchar(1000)")
.HasColumnName("Type");
b.HasKey("Id");
b.HasIndex("Name", "Group");
b.ToTable("TK_BackgroundJobs", (string)null);
});
modelBuilder.Entity("LINGYUN.Abp.TaskManagement.BackgroundJobLog", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
b.Property<string>("Exception")
.HasMaxLength(2000)
.HasColumnType("varchar(2000)")
.HasColumnName("Exception");
b.Property<string>("JobGroup")
.HasMaxLength(50)
.HasColumnType("varchar(50)")
.HasColumnName("JobGroup");
b.Property<string>("JobId")
.HasMaxLength(255)
.HasColumnType("varchar(255)")
.HasColumnName("JobId");
b.Property<string>("JobName")
.HasMaxLength(100)
.HasColumnType("varchar(100)")
.HasColumnName("JobName");
b.Property<string>("JobType")
.HasMaxLength(1000)
.HasColumnType("varchar(1000)")
.HasColumnName("JobType");
b.Property<string>("Message")
.HasMaxLength(1000)
.HasColumnType("varchar(1000)")
.HasColumnName("Message");
b.Property<DateTime>("RunTime")
.HasColumnType("datetime(6)");
b.Property<Guid?>("TenantId")
.HasColumnType("char(36)")
.HasColumnName("TenantId");
b.HasKey("Id");
b.HasIndex("JobGroup", "JobName");
b.ToTable("TK_BackgroundJobLogs", (string)null);
});
#pragma warning restore 612, 618
}
}
}

27
aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/Migrations/20220405072809_Add-Field-NodeName-With-BackgroundJobInfo.cs

@ -0,0 +1,27 @@
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace LY.MicroService.TaskManagement.Migrations
{
public partial class AddFieldNodeNameWithBackgroundJobInfo : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.AddColumn<string>(
name: "NodeName",
table: "TK_BackgroundJobs",
type: "varchar(128)",
maxLength: 128,
nullable: true)
.Annotation("MySql:CharSet", "utf8mb4");
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropColumn(
name: "NodeName",
table: "TK_BackgroundJobs");
}
}
}

5
aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/Migrations/TaskManagementMigrationsDbContextModelSnapshot.cs

@ -111,6 +111,11 @@ namespace LY.MicroService.TaskManagement.Migrations
b.Property<DateTime?>("NextRunTime") b.Property<DateTime?>("NextRunTime")
.HasColumnType("datetime(6)"); .HasColumnType("datetime(6)");
b.Property<string>("NodeName")
.HasMaxLength(128)
.HasColumnType("varchar(128)")
.HasColumnName("NodeName");
b.Property<int>("Priority") b.Property<int>("Priority")
.HasColumnType("int"); .HasColumnType("int");

14
aspnet-core/services/LY.MicroService.WebhooksManagement.HttpApi.Host/LY.MicroService.WebhooksManagement.HttpApi.Host.csproj

@ -34,6 +34,7 @@
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="$(OpenTelemetryInstrumentationAspNetCorePackageVersion)" /> <PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="$(OpenTelemetryInstrumentationAspNetCorePackageVersion)" />
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="$(OpenTelemetryInstrumentationHttpPackageVersion)" /> <PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="$(OpenTelemetryInstrumentationHttpPackageVersion)" />
<PackageReference Include="OpenTelemetry.Contrib.Instrumentation.EntityFrameworkCore" Version="$(OpenTelemetryContribInstrumentationEntityFrameworkCorePackageVersion)" /> <PackageReference Include="OpenTelemetry.Contrib.Instrumentation.EntityFrameworkCore" Version="$(OpenTelemetryContribInstrumentationEntityFrameworkCorePackageVersion)" />
<PackageReference Include="Quartz.Serialization.Json" Version="$(QuartzNETPackageVersion)" />
<PackageReference Include="Serilog.AspNetCore" Version="$(SerilogAspNetCorePackageVersion)" /> <PackageReference Include="Serilog.AspNetCore" Version="$(SerilogAspNetCorePackageVersion)" />
<PackageReference Include="Serilog.Enrichers.Environment" Version="$(SerilogEnrichersEnvironmentPackageVersion)" /> <PackageReference Include="Serilog.Enrichers.Environment" Version="$(SerilogEnrichersEnvironmentPackageVersion)" />
<PackageReference Include="Serilog.Enrichers.Assembly" Version="$(SerilogEnrichersAssemblyPackageVersion)" /> <PackageReference Include="Serilog.Enrichers.Assembly" Version="$(SerilogEnrichersAssemblyPackageVersion)" />
@ -57,11 +58,14 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\modules\webhooks\LINGYUN.Abp.Webhooks.Identity\LINGYUN.Abp.Webhooks.Identity.csproj" /> <ProjectReference Include="..\..\modules\webhooks\LINGYUN.Abp.Webhooks.Identity\LINGYUN.Abp.Webhooks.Identity.csproj" />
<ProjectReference Include="..\..\modules\webhooks\LINGYUN.Abp.Webhooks.Saas\LINGYUN.Abp.Webhooks.Saas.csproj" /> <ProjectReference Include="..\..\modules\webhooks\LINGYUN.Abp.Webhooks.Saas\LINGYUN.Abp.Webhooks.Saas.csproj" />
<ProjectReference Include="..\..\modules\webhooks\LINGYUN.Abp.WebhooksManagement.Application\LINGYUN.Abp.WebhooksManagement.Application.csproj" /> <ProjectReference Include="..\..\modules\webhooks\LINGYUN.Abp.WebhooksManagement.Application\LINGYUN.Abp.WebhooksManagement.Application.csproj" />
<ProjectReference Include="..\..\modules\webhooks\LINGYUN.Abp.WebhooksManagement.EntityFrameworkCore\LINGYUN.Abp.WebhooksManagement.EntityFrameworkCore.csproj" /> <ProjectReference Include="..\..\modules\webhooks\LINGYUN.Abp.WebhooksManagement.EntityFrameworkCore\LINGYUN.Abp.WebhooksManagement.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\..\modules\webhooks\LINGYUN.Abp.WebhooksManagement.HttpApi\LINGYUN.Abp.WebhooksManagement.HttpApi.csproj" /> <ProjectReference Include="..\..\modules\webhooks\LINGYUN.Abp.WebhooksManagement.HttpApi\LINGYUN.Abp.WebhooksManagement.HttpApi.csproj" />
<ProjectReference Include="..\..\modules\task-management\LINGYUN.Abp.BackgroundTasks.Quartz\LINGYUN.Abp.BackgroundTasks.Quartz.csproj" />
<ProjectReference Include="..\..\modules\task-management\LINGYUN.Abp.BackgroundTasks.ExceptionHandling\LINGYUN.Abp.BackgroundTasks.ExceptionHandling.csproj" />
<ProjectReference Include="..\..\modules\task-management\LINGYUN.Abp.TaskManagement.EntityFrameworkCore\LINGYUN.Abp.TaskManagement.EntityFrameworkCore.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

40
aspnet-core/services/LY.MicroService.WebhooksManagement.HttpApi.Host/WebhooksManagementHttpApiHostModule.Configure.cs

@ -1,4 +1,5 @@
using DotNetCore.CAP; using DotNetCore.CAP;
using LINGYUN.Abp.BackgroundTasks;
using LINGYUN.Abp.Dapr.Client.DynamicProxying; using LINGYUN.Abp.Dapr.Client.DynamicProxying;
using LINGYUN.Abp.ExceptionHandling; using LINGYUN.Abp.ExceptionHandling;
using LINGYUN.Abp.ExceptionHandling.Emailing; using LINGYUN.Abp.ExceptionHandling.Emailing;
@ -15,8 +16,10 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.OpenApi.Models; using Microsoft.OpenApi.Models;
using OpenTelemetry.Resources; using OpenTelemetry.Resources;
using OpenTelemetry.Trace; using OpenTelemetry.Trace;
using Quartz;
using StackExchange.Redis; using StackExchange.Redis;
using System; using System;
using System.Collections.Generic;
using System.Text.Encodings.Web; using System.Text.Encodings.Web;
using System.Text.Unicode; using System.Text.Unicode;
using Volo.Abp; using Volo.Abp;
@ -28,6 +31,7 @@ using Volo.Abp.Json;
using Volo.Abp.Json.SystemTextJson; using Volo.Abp.Json.SystemTextJson;
using Volo.Abp.Localization; using Volo.Abp.Localization;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
using Volo.Abp.Quartz;
using Volo.Abp.Threading; using Volo.Abp.Threading;
using Volo.Abp.VirtualFileSystem; using Volo.Abp.VirtualFileSystem;
@ -76,6 +80,34 @@ public partial class WebhooksManagementHttpApiHostModule
}); });
} }
private void PreConfigureQuartz(IConfiguration configuration)
{
PreConfigure<AbpQuartzOptions>(options =>
{
// 如果使用持久化存储, 则配置quartz持久层
if (configuration.GetSection("Quartz:UsePersistentStore").Get<bool>())
{
var settings = configuration.GetSection("Quartz:Properties").Get<Dictionary<string, string>>();
if (settings != null)
{
foreach (var setting in settings)
{
options.Properties[setting.Key] = setting.Value;
}
}
options.Configurator += (config) =>
{
config.UsePersistentStore(store =>
{
store.UseProperties = false;
store.UseJsonSerializer();
});
};
}
});
}
private void ConfigureDbContext() private void ConfigureDbContext()
{ {
// 配置Ef // 配置Ef
@ -90,6 +122,14 @@ public partial class WebhooksManagementHttpApiHostModule
}); });
} }
private void ConfigureBackgroundTasks()
{
Configure<AbpBackgroundTasksOptions>(options =>
{
options.NodeName = ApplicationName;
});
}
private void ConfigureJsonSerializer() private void ConfigureJsonSerializer()
{ {
// 解决某些不支持类型的序列化 // 解决某些不支持类型的序列化

9
aspnet-core/services/LY.MicroService.WebhooksManagement.HttpApi.Host/WebhooksManagementHttpApiHostModule.cs

@ -1,12 +1,15 @@
using DotNetCore.CAP; using DotNetCore.CAP;
using LINGYUN.Abp.AspNetCore.Mvc.Wrapper; using LINGYUN.Abp.AspNetCore.Mvc.Wrapper;
using LINGYUN.Abp.AuditLogging.Elasticsearch; using LINGYUN.Abp.AuditLogging.Elasticsearch;
using LINGYUN.Abp.BackgroundTasks.ExceptionHandling;
using LINGYUN.Abp.BackgroundTasks.Quartz;
using LINGYUN.Abp.EventBus.CAP; using LINGYUN.Abp.EventBus.CAP;
using LINGYUN.Abp.ExceptionHandling.Emailing; using LINGYUN.Abp.ExceptionHandling.Emailing;
using LINGYUN.Abp.LocalizationManagement.EntityFrameworkCore; using LINGYUN.Abp.LocalizationManagement.EntityFrameworkCore;
using LINGYUN.Abp.Saas.EntityFrameworkCore; using LINGYUN.Abp.Saas.EntityFrameworkCore;
using LINGYUN.Abp.Serilog.Enrichers.Application; using LINGYUN.Abp.Serilog.Enrichers.Application;
using LINGYUN.Abp.Serilog.Enrichers.UniqueId; using LINGYUN.Abp.Serilog.Enrichers.UniqueId;
using LINGYUN.Abp.TaskManagement.EntityFrameworkCore;
using LINGYUN.Abp.Webhooks.Identity; using LINGYUN.Abp.Webhooks.Identity;
using LINGYUN.Abp.Webhooks.Saas; using LINGYUN.Abp.Webhooks.Saas;
using LINGYUN.Abp.WebhooksManagement; using LINGYUN.Abp.WebhooksManagement;
@ -42,6 +45,9 @@ namespace LY.MicroService.WebhooksManagement;
typeof(WebhooksManagementEntityFrameworkCoreModule), typeof(WebhooksManagementEntityFrameworkCoreModule),
typeof(AbpWebhooksIdentityModule), typeof(AbpWebhooksIdentityModule),
typeof(AbpWebhooksSaasModule), typeof(AbpWebhooksSaasModule),
typeof(AbpBackgroundTasksQuartzModule),
typeof(AbpBackgroundTasksExceptionHandlingModule),
typeof(TaskManagementEntityFrameworkCoreModule),
typeof(AbpEntityFrameworkCoreMySQLModule), typeof(AbpEntityFrameworkCoreMySQLModule),
typeof(AbpAspNetCoreAuthenticationJwtBearerModule), typeof(AbpAspNetCoreAuthenticationJwtBearerModule),
typeof(AbpEmailingExceptionHandlingModule), typeof(AbpEmailingExceptionHandlingModule),
@ -68,6 +74,7 @@ public partial class WebhooksManagementHttpApiHostModule : AbpModule
PreConfigureApp(); PreConfigureApp();
PreConfigureFeature(); PreConfigureFeature();
PreConfigureCAP(configuration); PreConfigureCAP(configuration);
PreConfigureQuartz(configuration);
} }
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
@ -79,6 +86,7 @@ public partial class WebhooksManagementHttpApiHostModule : AbpModule
ConfigureDbContext(); ConfigureDbContext();
ConfigureLocalization(); ConfigureLocalization();
ConfigureJsonSerializer(); ConfigureJsonSerializer();
ConfigureBackgroundTasks();
ConfigureExceptionHandling(); ConfigureExceptionHandling();
ConfigureVirtualFileSystem(); ConfigureVirtualFileSystem();
ConfigureCaching(configuration); ConfigureCaching(configuration);
@ -89,7 +97,6 @@ public partial class WebhooksManagementHttpApiHostModule : AbpModule
ConfigureDistributedLock(context.Services, configuration); ConfigureDistributedLock(context.Services, configuration);
ConfigureSeedWorker(context.Services, hostingEnvironment.IsDevelopment()); ConfigureSeedWorker(context.Services, hostingEnvironment.IsDevelopment());
ConfigureSecurity(context.Services, configuration, hostingEnvironment.IsDevelopment()); ConfigureSecurity(context.Services, configuration, hostingEnvironment.IsDevelopment());
context.Services.AddAlwaysAllowAuthorization(); context.Services.AddAlwaysAllowAuthorization();
} }

15
aspnet-core/services/LY.MicroService.WebhooksManagement.HttpApi.Host/appsettings.Development.json

@ -13,6 +13,7 @@
"ConnectionStrings": { "ConnectionStrings": {
"Default": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456", "Default": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456",
"WebhooksManagement": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456", "WebhooksManagement": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456",
"TaskManagement": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456",
"AbpSaas": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456", "AbpSaas": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456",
"AbpFeatureManagement": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456", "AbpFeatureManagement": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456",
"AbpPermissionManagement": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456", "AbpPermissionManagement": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456",
@ -60,6 +61,20 @@
"VirtualHost": "/" "VirtualHost": "/"
} }
}, },
"Quartz": {
"UsePersistentStore": false,
"Properties": {
"quartz.jobStore.dataSource": "tkm",
"quartz.jobStore.type": "Quartz.Impl.AdoJobStore.JobStoreTX,Quartz",
"quartz.jobStore.driverDelegateType": "Quartz.Impl.AdoJobStore.MySQLDelegate,Quartz",
"quartz.dataSource.tkm.connectionString": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456",
"quartz.dataSource.tkm.connectionStringName": "TaskManagement",
"quartz.dataSource.tkm.provider": "MySqlConnector",
"quartz.jobStore.clustered": "true",
"quartz.serializer.type": "json",
"quartz.scheduler.instanceName": "webhook"
}
},
"Redis": { "Redis": {
"Configuration": "127.0.0.1,defaultDatabase=10", "Configuration": "127.0.0.1,defaultDatabase=10",
"InstanceName": "LINGYUN.Abp.Application" "InstanceName": "LINGYUN.Abp.Application"

Loading…
Cancel
Save