16 changed files with 334 additions and 37 deletions
@ -0,0 +1,10 @@ |
|||||
|
using Quartz; |
||||
|
|
||||
|
namespace LINGYUN.Abp.BackgroundTasks.Quartz; |
||||
|
|
||||
|
public interface IQuartzKeyBuilder |
||||
|
{ |
||||
|
JobKey CreateJobKey(JobInfo jobInfo); |
||||
|
|
||||
|
TriggerKey CreateTriggerKey(JobInfo jobInfo); |
||||
|
} |
||||
@ -0,0 +1,35 @@ |
|||||
|
using Quartz; |
||||
|
using Volo.Abp.DependencyInjection; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
|
||||
|
namespace LINGYUN.Abp.BackgroundTasks.Quartz; |
||||
|
|
||||
|
public class QuartzKeyBuilder : IQuartzKeyBuilder, ISingletonDependency |
||||
|
{ |
||||
|
protected ICurrentTenant CurrentTenant { get; } |
||||
|
|
||||
|
public QuartzKeyBuilder(ICurrentTenant currentTenant) |
||||
|
{ |
||||
|
CurrentTenant = currentTenant; |
||||
|
} |
||||
|
|
||||
|
public JobKey CreateJobKey(JobInfo jobInfo) |
||||
|
{ |
||||
|
var name = jobInfo.Id.ToString(); |
||||
|
var group = CurrentTenant.IsAvailable |
||||
|
? $"{CurrentTenant.Id}:{jobInfo.Group}" |
||||
|
: $"Default:{jobInfo.Group}"; |
||||
|
|
||||
|
return new JobKey(name, group); |
||||
|
} |
||||
|
|
||||
|
public TriggerKey CreateTriggerKey(JobInfo jobInfo) |
||||
|
{ |
||||
|
var name = jobInfo.Id.ToString(); |
||||
|
var group = CurrentTenant.IsAvailable |
||||
|
? $"{CurrentTenant.Id}:{jobInfo.Group}" |
||||
|
: $"Default:{jobInfo.Group}"; |
||||
|
|
||||
|
return new TriggerKey(name, group); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,140 @@ |
|||||
|
using LINGYUN.Abp.BackgroundTasks; |
||||
|
using LINGYUN.Abp.BackgroundTasks.Internal; |
||||
|
using LINGYUN.Abp.Data.DbMigrator; |
||||
|
using LINGYUN.Abp.MultiTenancy; |
||||
|
using LY.MicroService.TaskManagement.EntityFrameworkCore; |
||||
|
using Microsoft.EntityFrameworkCore; |
||||
|
using Microsoft.Extensions.Logging; |
||||
|
using Microsoft.Extensions.Options; |
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Threading.Tasks; |
||||
|
using Volo.Abp.DependencyInjection; |
||||
|
using Volo.Abp.Domain.Entities.Events.Distributed; |
||||
|
using Volo.Abp.EventBus.Distributed; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
using Volo.Abp.TenantManagement; |
||||
|
using Volo.Abp.Uow; |
||||
|
|
||||
|
namespace LY.MicroService.TaskManagement.EventBus.Handlers |
||||
|
{ |
||||
|
public class TenantSynchronizer : |
||||
|
IDistributedEventHandler<CreateEventData>, |
||||
|
IDistributedEventHandler<EntityDeletedEto<TenantEto>>, |
||||
|
ITransientDependency |
||||
|
{ |
||||
|
protected ILogger<TenantSynchronizer> Logger { get; } |
||||
|
|
||||
|
protected ICurrentTenant CurrentTenant { get; } |
||||
|
protected IUnitOfWorkManager UnitOfWorkManager { get; } |
||||
|
protected IDbSchemaMigrator DbSchemaMigrator { get; } |
||||
|
protected AbpBackgroundTasksOptions Options { get; } |
||||
|
protected IJobScheduler JobScheduler { get; } |
||||
|
public TenantSynchronizer( |
||||
|
ICurrentTenant currentTenant, |
||||
|
IUnitOfWorkManager unitOfWorkManager, |
||||
|
IDbSchemaMigrator dbSchemaMigrator, |
||||
|
IOptions<AbpBackgroundTasksOptions> options, |
||||
|
IJobScheduler jobScheduler, |
||||
|
ILogger<TenantSynchronizer> logger) |
||||
|
{ |
||||
|
CurrentTenant = currentTenant; |
||||
|
UnitOfWorkManager = unitOfWorkManager; |
||||
|
DbSchemaMigrator = dbSchemaMigrator; |
||||
|
JobScheduler = jobScheduler; |
||||
|
Options = options.Value; |
||||
|
|
||||
|
Logger = logger; |
||||
|
} |
||||
|
|
||||
|
public async Task HandleEventAsync(EntityDeletedEto<TenantEto> eventData) |
||||
|
{ |
||||
|
// 租户删除时移除轮询作业
|
||||
|
var pollingJob = BuildPollingJobInfo(eventData.Entity.Id, eventData.Entity.Name); |
||||
|
await JobScheduler.RemoveAsync(pollingJob); |
||||
|
|
||||
|
var cleaningJob = BuildCleaningJobInfo(eventData.Entity.Id, eventData.Entity.Name); |
||||
|
await JobScheduler.RemoveAsync(cleaningJob); |
||||
|
} |
||||
|
|
||||
|
public async Task HandleEventAsync(CreateEventData eventData) |
||||
|
{ |
||||
|
await MigrateAsync(eventData); |
||||
|
|
||||
|
// 持久层介入之后提供对于租户的后台工作者轮询作业
|
||||
|
await QueueBackgroundJobAsync(eventData); |
||||
|
} |
||||
|
|
||||
|
private async Task QueueBackgroundJobAsync(CreateEventData eventData) |
||||
|
{ |
||||
|
var pollingJob = BuildPollingJobInfo(eventData.Id, eventData.Name); |
||||
|
await JobScheduler.QueueAsync(pollingJob); |
||||
|
|
||||
|
var cleaningJob = BuildCleaningJobInfo(eventData.Id, eventData.Name); |
||||
|
await JobScheduler.QueueAsync(cleaningJob); |
||||
|
} |
||||
|
|
||||
|
private async Task MigrateAsync(CreateEventData eventData) |
||||
|
{ |
||||
|
using (var unitOfWork = UnitOfWorkManager.Begin()) |
||||
|
{ |
||||
|
using (CurrentTenant.Change(eventData.Id, eventData.Name)) |
||||
|
{ |
||||
|
Logger.LogInformation("Migrating the new tenant database with localization.."); |
||||
|
// 迁移租户数据
|
||||
|
await DbSchemaMigrator.MigrateAsync<TaskManagementMigrationsDbContext>( |
||||
|
(connectionString, builder) => |
||||
|
{ |
||||
|
builder.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString)); |
||||
|
|
||||
|
return new TaskManagementMigrationsDbContext(builder.Options); |
||||
|
}); |
||||
|
await unitOfWork.SaveChangesAsync(); |
||||
|
|
||||
|
Logger.LogInformation("Migrated the new tenant database with localization."); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private JobInfo BuildPollingJobInfo(Guid tenantId, string tenantName) |
||||
|
{ |
||||
|
return new JobInfo |
||||
|
{ |
||||
|
Id = tenantId, |
||||
|
Name = nameof(BackgroundPollingJob), |
||||
|
Group = "Polling", |
||||
|
Description = "Polling tasks to be executed", |
||||
|
Args = new Dictionary<string, object>() { { nameof(JobInfo.TenantId), tenantId } }, |
||||
|
Status = JobStatus.Running, |
||||
|
BeginTime = DateTime.Now, |
||||
|
CreationTime = DateTime.Now, |
||||
|
Cron = Options.JobFetchCronExpression, |
||||
|
JobType = JobType.Period, |
||||
|
Priority = JobPriority.High, |
||||
|
LockTimeOut = Options.JobFetchLockTimeOut, |
||||
|
TenantId = tenantId, |
||||
|
Type = typeof(BackgroundPollingJob).AssemblyQualifiedName, |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
private JobInfo BuildCleaningJobInfo(Guid tenantId, string tenantName) |
||||
|
{ |
||||
|
return new JobInfo |
||||
|
{ |
||||
|
Id = tenantId, |
||||
|
Name = nameof(BackgroundCleaningJob), |
||||
|
Group = "Cleaning", |
||||
|
Description = "Cleaning tasks to be executed", |
||||
|
Args = new Dictionary<string, object>() { { nameof(JobInfo.TenantId), tenantId } }, |
||||
|
Status = JobStatus.Running, |
||||
|
BeginTime = DateTime.Now, |
||||
|
CreationTime = DateTime.Now, |
||||
|
Cron = Options.JobCleanCronExpression, |
||||
|
JobType = JobType.Period, |
||||
|
Priority = JobPriority.High, |
||||
|
TenantId = tenantId, |
||||
|
Type = typeof(BackgroundCleaningJob).AssemblyQualifiedName, |
||||
|
}; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue