Browse Source

Merge pull request #584 from colinin/5.2.0

feat(tasks): 手动处理通过事件变更作业状态的工作单元
pull/645/head 5.2.0
yx lin 4 years ago
committed by GitHub
parent
commit
6a1315cd08
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs

14
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobStore.cs

@ -16,6 +16,7 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
{ {
protected IObjectMapper ObjectMapper { get; } protected IObjectMapper ObjectMapper { get; }
protected ICurrentTenant CurrentTenant { get; } protected ICurrentTenant CurrentTenant { get; }
protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IBackgroundJobInfoRepository JobInfoRepository { get; } protected IBackgroundJobInfoRepository JobInfoRepository { get; }
protected IBackgroundJobLogRepository JobLogRepository { get; } protected IBackgroundJobLogRepository JobLogRepository { get; }
@ -24,12 +25,14 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
public BackgroundJobStore( public BackgroundJobStore(
IObjectMapper objectMapper, IObjectMapper objectMapper,
ICurrentTenant currentTenant, ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IBackgroundJobInfoRepository jobInfoRepository, IBackgroundJobInfoRepository jobInfoRepository,
IBackgroundJobLogRepository jobLogRepository, IBackgroundJobLogRepository jobLogRepository,
IOptions<AbpBackgroundTasksOptions> options) IOptions<AbpBackgroundTasksOptions> options)
{ {
ObjectMapper = objectMapper; ObjectMapper = objectMapper;
CurrentTenant = currentTenant; CurrentTenant = currentTenant;
UnitOfWorkManager = unitOfWorkManager;
JobInfoRepository = jobInfoRepository; JobInfoRepository = jobInfoRepository;
JobLogRepository = jobLogRepository; JobLogRepository = jobLogRepository;
Options = options.Value; Options = options.Value;
@ -54,9 +57,9 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
return await JobInfoRepository.FindJobAsync(jobId); return await JobInfoRepository.FindJobAsync(jobId);
} }
[UnitOfWork]
public async virtual Task StoreAsync(JobInfo jobInfo) public async virtual Task StoreAsync(JobInfo jobInfo)
{ {
using var unitOfWork = UnitOfWorkManager.Begin();
using (CurrentTenant.Change(jobInfo.TenantId)) using (CurrentTenant.Change(jobInfo.TenantId))
{ {
var backgroundJobInfo = await JobInfoRepository.FindAsync(jobInfo.Id); var backgroundJobInfo = await JobInfoRepository.FindAsync(jobInfo.Id);
@ -115,12 +118,13 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
await JobInfoRepository.InsertAsync(backgroundJobInfo); await JobInfoRepository.InsertAsync(backgroundJobInfo);
} }
await unitOfWork.SaveChangesAsync();
} }
} }
[UnitOfWork]
public async virtual Task StoreLogAsync(JobEventData eventData) public async virtual Task StoreLogAsync(JobEventData eventData)
{ {
using var unitOfWork = UnitOfWorkManager.Begin();
using (CurrentTenant.Change(eventData.TenantId)) using (CurrentTenant.Change(eventData.TenantId))
{ {
var jogLog = new BackgroundJobLog( var jogLog = new BackgroundJobLog(
@ -138,15 +142,17 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
eventData.Exception); eventData.Exception);
await JobLogRepository.InsertAsync(jogLog); await JobLogRepository.InsertAsync(jogLog);
await unitOfWork.SaveChangesAsync();
} }
} }
[UnitOfWork]
public async virtual Task CleanupAsync( public async virtual Task CleanupAsync(
int maxResultCount, int maxResultCount,
TimeSpan jobExpiratime, TimeSpan jobExpiratime,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
using var unitOfWork = UnitOfWorkManager.Begin();
var jobs = await JobInfoRepository.GetExpiredJobsAsync( var jobs = await JobInfoRepository.GetExpiredJobsAsync(
Options.NodeName, Options.NodeName,
maxResultCount, maxResultCount,
@ -154,6 +160,8 @@ public class BackgroundJobStore : IJobStore, ITransientDependency
cancellationToken); cancellationToken);
await JobInfoRepository.DeleteManyAsync(jobs, cancellationToken: cancellationToken); await JobInfoRepository.DeleteManyAsync(jobs, cancellationToken: cancellationToken);
await unitOfWork.SaveChangesAsync();
} }
protected virtual Exception GetSourceException(Exception exception) protected virtual Exception GetSourceException(Exception exception)

Loading…
Cancel
Save