37 changed files with 1162 additions and 218 deletions
@ -0,0 +1,19 @@ |
|||
<Project Sdk="Microsoft.NET.Sdk"> |
|||
|
|||
<Import Project="..\..\..\configureawait.props" /> |
|||
<Import Project="..\..\..\common.props" /> |
|||
|
|||
<PropertyGroup> |
|||
<TargetFramework>netstandard2.0</TargetFramework> |
|||
<RootNamespace /> |
|||
</PropertyGroup> |
|||
|
|||
<ItemGroup> |
|||
<PackageReference Include="Volo.Abp.Hangfire" Version="$(VoloAbpPackageVersion)" /> |
|||
</ItemGroup> |
|||
|
|||
<ItemGroup> |
|||
<ProjectReference Include="..\LINGYUN.Abp.BackgroundTasks\LINGYUN.Abp.BackgroundTasks.csproj" /> |
|||
</ItemGroup> |
|||
|
|||
</Project> |
|||
@ -0,0 +1,18 @@ |
|||
using Hangfire; |
|||
using Volo.Abp.Hangfire; |
|||
using Volo.Abp.Modularity; |
|||
|
|||
namespace LINGYUN.Abp.BackgroundTasks.Hangfire; |
|||
|
|||
[DependsOn(typeof(AbpBackgroundTasksModule))] |
|||
[DependsOn(typeof(AbpHangfireModule))] |
|||
public class AbpBackgroundTasksHangfireModule : AbpModule |
|||
{ |
|||
public override void ConfigureServices(ServiceConfigurationContext context) |
|||
{ |
|||
context.Services.AddHangfire((provider, configuration) => |
|||
{ |
|||
configuration.UseFilter(new HangfireJobExecutedAttribute(provider)); |
|||
}); |
|||
} |
|||
} |
|||
@ -0,0 +1,96 @@ |
|||
using Hangfire.Common; |
|||
using Hangfire.Server; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using System; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace LINGYUN.Abp.BackgroundTasks.Hangfire; |
|||
|
|||
public class HangfireJobExecutedAttribute : JobFilterAttribute, IServerFilter |
|||
{ |
|||
public ILogger<HangfireJobExecutedAttribute> Logger { protected get; set; } |
|||
public IServiceProvider ServiceProvider { get; set; } |
|||
|
|||
public HangfireJobExecutedAttribute() |
|||
{ |
|||
Logger = NullLogger<HangfireJobExecutedAttribute>.Instance; |
|||
} |
|||
|
|||
public HangfireJobExecutedAttribute(IServiceProvider serviceProvider) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
|
|||
Logger = NullLogger<HangfireJobExecutedAttribute>.Instance; |
|||
} |
|||
|
|||
public async void OnPerformed(PerformedContext filterContext) |
|||
{ |
|||
if (Guid.TryParse(filterContext.BackgroundJob.Id, out var jobUUId)) |
|||
{ |
|||
try |
|||
{ |
|||
var jobEventProvider = ServiceProvider.GetRequiredService<IJobEventProvider>(); |
|||
var jobEventList = jobEventProvider.GetAll(); |
|||
if (!jobEventList.Any()) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
using var scope = ServiceProvider.CreateScope(); |
|||
|
|||
var jobGroup = filterContext.Connection |
|||
.GetJobParameter(filterContext.BackgroundJob.Id, nameof(JobInfo.Group)); |
|||
var jobName = filterContext.Connection |
|||
.GetJobParameter(filterContext.BackgroundJob.Id, nameof(JobInfo.Name)); |
|||
|
|||
var jobEventData = new JobEventData( |
|||
jobUUId, |
|||
filterContext.BackgroundJob.Job.Type, |
|||
jobGroup, |
|||
jobName) |
|||
{ |
|||
Result = filterContext.Result?.ToString() |
|||
}; |
|||
|
|||
var eventContext = new JobEventContext( |
|||
scope.ServiceProvider, |
|||
jobEventData); |
|||
|
|||
var index = 0; |
|||
var taskList = new Task[jobEventList.Count]; |
|||
foreach (var jobEvent in jobEventList) |
|||
{ |
|||
taskList[index] = jobEvent.OnJobBeforeExecuted(eventContext); |
|||
index++; |
|||
} |
|||
|
|||
await Task.WhenAll(taskList); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
Logger.LogError($"The event before the task execution is abnormal:{ex}"); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public async void OnPerforming(PerformingContext filterContext) |
|||
{ |
|||
var lockTime = filterContext.Connection |
|||
.GetJobParameter(filterContext.BackgroundJob.Id, nameof(JobInfo.LockTimeOut)); |
|||
|
|||
if (!lockTime.IsNullOrWhiteSpace() && int.TryParse(lockTime, out var time) && time > 0) |
|||
{ |
|||
var jobLockProvider = ServiceProvider.GetRequiredService<IJobLockProvider>(); |
|||
if (!await jobLockProvider.TryLockAsync( |
|||
filterContext.BackgroundJob.Id, |
|||
time, |
|||
filterContext.CancellationToken.ShutdownToken)) |
|||
{ |
|||
filterContext.Canceled = true; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,122 @@ |
|||
using Hangfire; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Microsoft.Extensions.Options; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Collections.Immutable; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace LINGYUN.Abp.BackgroundTasks.Hangfire; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class HangfireJobScheduler : IJobScheduler, ISingletonDependency |
|||
{ |
|||
public ILogger<HangfireJobScheduler> Logger { protected get; set; } |
|||
protected AbpBackgroundTasksOptions Options { get; } |
|||
|
|||
protected JobStorage JobStorage { get; } |
|||
protected IRecurringJobManager RecurringJobManager { get; } |
|||
|
|||
public HangfireJobScheduler( |
|||
JobStorage jobStorage, |
|||
IOptions<AbpBackgroundTasksOptions> options) |
|||
{ |
|||
Options = options.Value; |
|||
|
|||
JobStorage = jobStorage; |
|||
RecurringJobManager = new RecurringJobManager(jobStorage); |
|||
|
|||
Logger = NullLogger<HangfireJobScheduler>.Instance; |
|||
} |
|||
|
|||
public Task<bool> ExistsAsync(JobInfo job) |
|||
{ |
|||
var monitor = JobStorage.GetMonitoringApi(); |
|||
|
|||
monitor.JobDetails(job.); |
|||
} |
|||
|
|||
public Task PauseAsync(JobInfo job) |
|||
{ |
|||
throw new NotImplementedException(); |
|||
} |
|||
|
|||
public virtual Task<bool> QueueAsync(JobInfo job) |
|||
{ |
|||
var jobType = Options.JobProviders.GetOrDefault(job.Type) ?? Type.GetType(job.Type, false); |
|||
if (jobType == null) |
|||
{ |
|||
Logger.LogWarning($"The task: {job.Group} - {job.Name}: {job.Type} is not registered and cannot create an instance of the performer type."); |
|||
return Task.FromResult(false); |
|||
} |
|||
var jobData = job.Args; |
|||
jobData[nameof(JobInfo.Id)] = job.Id; |
|||
jobData[nameof(JobInfo.Group)] = job.Group; |
|||
jobData[nameof(JobInfo.Name)] = job.Name; |
|||
|
|||
switch (job.JobType) |
|||
{ |
|||
case JobType.Once: |
|||
var jobId = BackgroundJob.Schedule<HangfireJobSimpleAdapter>( |
|||
adapter => adapter.ExecuteAsync(jobType, jobData.ToImmutableDictionary()), |
|||
TimeSpan.FromSeconds(job.Interval)); |
|||
job.Args["hangfire"] = jobId; |
|||
break; |
|||
case JobType.Persistent: |
|||
var minuteInterval = job.Interval / 60; |
|||
if (minuteInterval < 1) |
|||
{ |
|||
minuteInterval = 1; |
|||
} |
|||
RecurringJob.AddOrUpdate<HangfireJobSimpleAdapter>( |
|||
adapter => adapter.ExecuteAsync(jobType, jobData.ToImmutableDictionary()), |
|||
Cron.MinuteInterval(minuteInterval)); |
|||
break; |
|||
case JobType.Period: |
|||
RecurringJob.AddOrUpdate<HangfireJobSimpleAdapter>( |
|||
adapter => adapter.ExecuteAsync(jobType, jobData.ToImmutableDictionary()), |
|||
job.Cron, |
|||
queue: job.Group); |
|||
break; |
|||
} |
|||
|
|||
return Task.FromResult(true); |
|||
} |
|||
|
|||
public Task QueuesAsync(IEnumerable<JobInfo> jobs) |
|||
{ |
|||
throw new NotImplementedException(); |
|||
} |
|||
|
|||
public Task<bool> RemoveAsync(JobInfo job) |
|||
{ |
|||
throw new NotImplementedException(); |
|||
} |
|||
|
|||
public Task ResumeAsync(JobInfo job) |
|||
{ |
|||
throw new NotImplementedException(); |
|||
} |
|||
|
|||
public Task<bool> ShutdownAsync() |
|||
{ |
|||
throw new NotImplementedException(); |
|||
} |
|||
|
|||
public Task<bool> StartAsync() |
|||
{ |
|||
throw new NotImplementedException(); |
|||
} |
|||
|
|||
public Task<bool> StopAsync() |
|||
{ |
|||
throw new NotImplementedException(); |
|||
} |
|||
|
|||
public Task TriggerAsync(JobInfo job) |
|||
{ |
|||
throw new NotImplementedException(); |
|||
} |
|||
} |
|||
@ -0,0 +1,32 @@ |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace LINGYUN.Abp.BackgroundTasks.Hangfire; |
|||
|
|||
public class HangfireJobSimpleAdapter |
|||
{ |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
|
|||
public HangfireJobSimpleAdapter( |
|||
IServiceProvider serviceProvider) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
} |
|||
|
|||
public async virtual Task<object> ExecuteAsync(Type jobRunnableType, IReadOnlyDictionary<string, object> jobData) |
|||
{ |
|||
using var scope = ServiceProvider.CreateScope(); |
|||
var jobExecuter = scope.ServiceProvider.GetRequiredService<IJobRunnableExecuter>(); |
|||
|
|||
var jobContext = new JobRunnableContext( |
|||
jobRunnableType, |
|||
ServiceProvider, |
|||
jobData); |
|||
|
|||
await jobExecuter.ExecuteAsync(jobContext); |
|||
|
|||
return jobContext.Result; |
|||
} |
|||
} |
|||
@ -0,0 +1,9 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
|
|||
namespace LINGYUN.Abp.TaskManagement; |
|||
|
|||
public class BackgroundJobInfoBatchInput |
|||
{ |
|||
public List<Guid> JobIds { get; set; } = new List<Guid>(); |
|||
} |
|||
@ -0,0 +1,201 @@ |
|||
// <auto-generated />
|
|||
using System; |
|||
using LY.MicroService.TaskManagement.EntityFrameworkCore; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Microsoft.EntityFrameworkCore.Infrastructure; |
|||
using Microsoft.EntityFrameworkCore.Migrations; |
|||
using Microsoft.EntityFrameworkCore.Storage.ValueConversion; |
|||
using Volo.Abp.EntityFrameworkCore; |
|||
|
|||
#nullable disable |
|||
|
|||
namespace LY.MicroService.TaskManagement.Migrations |
|||
{ |
|||
[DbContext(typeof(TaskManagementMigrationsDbContext))] |
|||
[Migration("20220112093435_Add-Support-Multi-Tenancy-With-Background-Jobs")] |
|||
partial class AddSupportMultiTenancyWithBackgroundJobs |
|||
{ |
|||
protected override void BuildTargetModel(ModelBuilder modelBuilder) |
|||
{ |
|||
#pragma warning disable 612, 618
|
|||
modelBuilder |
|||
.HasAnnotation("_Abp_DatabaseProvider", EfCoreDatabaseProvider.MySql) |
|||
.HasAnnotation("ProductVersion", "6.0.1") |
|||
.HasAnnotation("Relational:MaxIdentifierLength", 64); |
|||
|
|||
modelBuilder.Entity("LINGYUN.Abp.TaskManagement.BackgroundJobInfo", b => |
|||
{ |
|||
b.Property<Guid>("Id") |
|||
.ValueGeneratedOnAdd() |
|||
.HasColumnType("char(36)"); |
|||
|
|||
b.Property<string>("Args") |
|||
.HasColumnType("longtext") |
|||
.HasColumnName("Args"); |
|||
|
|||
b.Property<DateTime>("BeginTime") |
|||
.HasColumnType("datetime(6)"); |
|||
|
|||
b.Property<string>("ConcurrencyStamp") |
|||
.IsConcurrencyToken() |
|||
.HasMaxLength(40) |
|||
.HasColumnType("varchar(40)") |
|||
.HasColumnName("ConcurrencyStamp"); |
|||
|
|||
b.Property<DateTime>("CreationTime") |
|||
.HasColumnType("datetime(6)") |
|||
.HasColumnName("CreationTime"); |
|||
|
|||
b.Property<Guid?>("CreatorId") |
|||
.HasColumnType("char(36)") |
|||
.HasColumnName("CreatorId"); |
|||
|
|||
b.Property<string>("Cron") |
|||
.HasMaxLength(50) |
|||
.HasColumnType("varchar(50)") |
|||
.HasColumnName("Cron"); |
|||
|
|||
b.Property<string>("Description") |
|||
.HasMaxLength(255) |
|||
.HasColumnType("varchar(255)") |
|||
.HasColumnName("Description"); |
|||
|
|||
b.Property<DateTime?>("EndTime") |
|||
.HasColumnType("datetime(6)"); |
|||
|
|||
b.Property<string>("ExtraProperties") |
|||
.HasColumnType("longtext") |
|||
.HasColumnName("ExtraProperties"); |
|||
|
|||
b.Property<string>("Group") |
|||
.IsRequired() |
|||
.HasMaxLength(50) |
|||
.HasColumnType("varchar(50)") |
|||
.HasColumnName("Group"); |
|||
|
|||
b.Property<int>("Interval") |
|||
.HasColumnType("int"); |
|||
|
|||
b.Property<bool>("IsAbandoned") |
|||
.HasColumnType("tinyint(1)"); |
|||
|
|||
b.Property<bool>("IsEnabled") |
|||
.HasColumnType("tinyint(1)"); |
|||
|
|||
b.Property<int>("JobType") |
|||
.HasColumnType("int"); |
|||
|
|||
b.Property<DateTime?>("LastModificationTime") |
|||
.HasColumnType("datetime(6)") |
|||
.HasColumnName("LastModificationTime"); |
|||
|
|||
b.Property<Guid?>("LastModifierId") |
|||
.HasColumnType("char(36)") |
|||
.HasColumnName("LastModifierId"); |
|||
|
|||
b.Property<DateTime?>("LastRunTime") |
|||
.HasColumnType("datetime(6)"); |
|||
|
|||
b.Property<int>("LockTimeOut") |
|||
.HasColumnType("int"); |
|||
|
|||
b.Property<int>("MaxCount") |
|||
.HasColumnType("int"); |
|||
|
|||
b.Property<int>("MaxTryCount") |
|||
.HasColumnType("int"); |
|||
|
|||
b.Property<string>("Name") |
|||
.IsRequired() |
|||
.HasMaxLength(100) |
|||
.HasColumnType("varchar(100)") |
|||
.HasColumnName("Name"); |
|||
|
|||
b.Property<DateTime?>("NextRunTime") |
|||
.HasColumnType("datetime(6)"); |
|||
|
|||
b.Property<int>("Priority") |
|||
.HasColumnType("int"); |
|||
|
|||
b.Property<string>("Result") |
|||
.HasMaxLength(1000) |
|||
.HasColumnType("varchar(1000)") |
|||
.HasColumnName("Result"); |
|||
|
|||
b.Property<int>("Status") |
|||
.HasColumnType("int"); |
|||
|
|||
b.Property<Guid?>("TenantId") |
|||
.HasColumnType("char(36)") |
|||
.HasColumnName("TenantId"); |
|||
|
|||
b.Property<int>("TriggerCount") |
|||
.HasColumnType("int"); |
|||
|
|||
b.Property<int>("TryCount") |
|||
.HasColumnType("int"); |
|||
|
|||
b.Property<string>("Type") |
|||
.IsRequired() |
|||
.HasMaxLength(1000) |
|||
.HasColumnType("varchar(1000)") |
|||
.HasColumnName("Type"); |
|||
|
|||
b.HasKey("Id"); |
|||
|
|||
b.HasIndex("Name", "Group"); |
|||
|
|||
b.ToTable("TK_BackgroundJobs", (string)null); |
|||
}); |
|||
|
|||
modelBuilder.Entity("LINGYUN.Abp.TaskManagement.BackgroundJobLog", b => |
|||
{ |
|||
b.Property<long>("Id") |
|||
.ValueGeneratedOnAdd() |
|||
.HasColumnType("bigint"); |
|||
|
|||
b.Property<string>("Exception") |
|||
.HasMaxLength(2000) |
|||
.HasColumnType("varchar(2000)") |
|||
.HasColumnName("Exception"); |
|||
|
|||
b.Property<string>("JobGroup") |
|||
.HasMaxLength(50) |
|||
.HasColumnType("varchar(50)") |
|||
.HasColumnName("JobGroup"); |
|||
|
|||
b.Property<Guid?>("JobId") |
|||
.HasColumnType("char(36)"); |
|||
|
|||
b.Property<string>("JobName") |
|||
.HasMaxLength(100) |
|||
.HasColumnType("varchar(100)") |
|||
.HasColumnName("JobName"); |
|||
|
|||
b.Property<string>("JobType") |
|||
.HasMaxLength(1000) |
|||
.HasColumnType("varchar(1000)") |
|||
.HasColumnName("JobType"); |
|||
|
|||
b.Property<string>("Message") |
|||
.HasMaxLength(1000) |
|||
.HasColumnType("varchar(1000)") |
|||
.HasColumnName("Message"); |
|||
|
|||
b.Property<DateTime>("RunTime") |
|||
.HasColumnType("datetime(6)"); |
|||
|
|||
b.Property<Guid?>("TenantId") |
|||
.HasColumnType("char(36)") |
|||
.HasColumnName("TenantId"); |
|||
|
|||
b.HasKey("Id"); |
|||
|
|||
b.HasIndex("JobGroup", "JobName"); |
|||
|
|||
b.ToTable("TK_BackgroundJobLogs", (string)null); |
|||
}); |
|||
#pragma warning restore 612, 618
|
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,38 @@ |
|||
using System; |
|||
using Microsoft.EntityFrameworkCore.Migrations; |
|||
|
|||
#nullable disable |
|||
|
|||
namespace LY.MicroService.TaskManagement.Migrations |
|||
{ |
|||
public partial class AddSupportMultiTenancyWithBackgroundJobs : Migration |
|||
{ |
|||
protected override void Up(MigrationBuilder migrationBuilder) |
|||
{ |
|||
migrationBuilder.AddColumn<Guid>( |
|||
name: "TenantId", |
|||
table: "TK_BackgroundJobs", |
|||
type: "char(36)", |
|||
nullable: true, |
|||
collation: "ascii_general_ci"); |
|||
|
|||
migrationBuilder.AddColumn<Guid>( |
|||
name: "TenantId", |
|||
table: "TK_BackgroundJobLogs", |
|||
type: "char(36)", |
|||
nullable: true, |
|||
collation: "ascii_general_ci"); |
|||
} |
|||
|
|||
protected override void Down(MigrationBuilder migrationBuilder) |
|||
{ |
|||
migrationBuilder.DropColumn( |
|||
name: "TenantId", |
|||
table: "TK_BackgroundJobs"); |
|||
|
|||
migrationBuilder.DropColumn( |
|||
name: "TenantId", |
|||
table: "TK_BackgroundJobLogs"); |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue