Browse Source

feat(tasks): 发布作业事件让参与节点自行管理作业状态

pull/573/head
cKey 4 years ago
parent
commit
06a705ce75
  1. 7
      aspnet-core/LINGYUN.MicroService.TaskManagement.sln
  2. 3
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/FodyWeavers.xml
  3. 30
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/FodyWeavers.xsd
  4. 19
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN.Abp.BackgroundTasks.EventBus.csproj
  5. 10
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/AbpBackgroundTasksEventBusModule.cs
  6. 15
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobEventData.cs
  7. 10
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobPauseEventData.cs
  8. 10
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobResumeEventData.cs
  9. 10
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobStartEventData.cs
  10. 10
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobStopEventData.cs
  11. 118
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobSynchronizer.cs
  12. 11
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobTriggerEventData.cs
  13. 19
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/BackgroundJobEto.cs
  14. 1
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN.Abp.TaskManagement.Domain.csproj
  15. 185
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs
  16. 60
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobSynchronizer.cs
  17. 1
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/TaskManagementDomainMapperProfile.cs
  18. 11
      aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/TaskManagementDomainModule.cs
  19. 5
      aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain.Shared/LINGYUN/Abp/WebhooksManagement/WebhookEventEto.cs
  20. 2
      aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain.Shared/LINGYUN/Abp/WebhooksManagement/WebhookSendAttemptEto.cs
  21. 2
      aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain.Shared/LINGYUN/Abp/WebhooksManagement/WebhookSubscriptionEto.cs
  22. 2
      aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs

7
aspnet-core/LINGYUN.MicroService.TaskManagement.sln

@ -44,6 +44,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LINGYUN.Abp.TaskManagement.
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LINGYUN.Abp.BackgroundTasks.TaskManagement", "modules\task-management\LINGYUN.Abp.BackgroundTasks.TaskManagement\LINGYUN.Abp.BackgroundTasks.TaskManagement.csproj", "{7937785C-0D28-46B3-A7E7-0B592821A1F2}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LINGYUN.Abp.BackgroundTasks.TaskManagement", "modules\task-management\LINGYUN.Abp.BackgroundTasks.TaskManagement\LINGYUN.Abp.BackgroundTasks.TaskManagement.csproj", "{7937785C-0D28-46B3-A7E7-0B592821A1F2}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LINGYUN.Abp.BackgroundTasks.EventBus", "modules\task-management\LINGYUN.Abp.BackgroundTasks.EventBus\LINGYUN.Abp.BackgroundTasks.EventBus.csproj", "{D17DEF79-635B-478D-89D7-32EAE616869A}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -106,6 +108,10 @@ Global
{7937785C-0D28-46B3-A7E7-0B592821A1F2}.Debug|Any CPU.Build.0 = Debug|Any CPU {7937785C-0D28-46B3-A7E7-0B592821A1F2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7937785C-0D28-46B3-A7E7-0B592821A1F2}.Release|Any CPU.ActiveCfg = Release|Any CPU {7937785C-0D28-46B3-A7E7-0B592821A1F2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7937785C-0D28-46B3-A7E7-0B592821A1F2}.Release|Any CPU.Build.0 = Release|Any CPU {7937785C-0D28-46B3-A7E7-0B592821A1F2}.Release|Any CPU.Build.0 = Release|Any CPU
{D17DEF79-635B-478D-89D7-32EAE616869A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D17DEF79-635B-478D-89D7-32EAE616869A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D17DEF79-635B-478D-89D7-32EAE616869A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D17DEF79-635B-478D-89D7-32EAE616869A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -125,6 +131,7 @@ Global
{8507BBFA-FE56-4426-BBFA-C92906CB8407} = {C38EB7EF-BAE9-4129-862A-71C652B81775} {8507BBFA-FE56-4426-BBFA-C92906CB8407} = {C38EB7EF-BAE9-4129-862A-71C652B81775}
{56C759D1-6FE6-4111-A2DB-CD65DCE82061} = {C38EB7EF-BAE9-4129-862A-71C652B81775} {56C759D1-6FE6-4111-A2DB-CD65DCE82061} = {C38EB7EF-BAE9-4129-862A-71C652B81775}
{7937785C-0D28-46B3-A7E7-0B592821A1F2} = {385578CC-C0F1-4377-A7A2-682B8F416234} {7937785C-0D28-46B3-A7E7-0B592821A1F2} = {385578CC-C0F1-4377-A7A2-682B8F416234}
{D17DEF79-635B-478D-89D7-32EAE616869A} = {C38EB7EF-BAE9-4129-862A-71C652B81775}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E1FD1F4C-D344-408B-97CF-B6F1F6D7D293} SolutionGuid = {E1FD1F4C-D344-408B-97CF-B6F1F6D7D293}

3
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/FodyWeavers.xml

@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>

30
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/FodyWeavers.xsd

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>

19
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN.Abp.BackgroundTasks.EventBus.csproj

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<RootNamespace />
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Volo.Abp.EventBus" Version="$(VoloAbpPackageVersion)" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\LINGYUN.Abp.BackgroundTasks\LINGYUN.Abp.BackgroundTasks.csproj" />
</ItemGroup>
</Project>

10
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/AbpBackgroundTasksEventBusModule.cs

@ -0,0 +1,10 @@
using Volo.Abp.EventBus;
using Volo.Abp.Modularity;
namespace LINGYUN.Abp.BackgroundTasks.EventBus;
[DependsOn(typeof(AbpEventBusModule))]
[DependsOn(typeof(AbpBackgroundTasksModule))]
public class AbpBackgroundTasksEventBusModule : AbpModule
{
}

15
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobEventData.cs

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using Volo.Abp.EventBus;
using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.BackgroundTasks.EventBus;
[Serializable]
[EventName("abp.background-tasks.job")]
public class JobEventData : IMultiTenant
{
public Guid? TenantId { get; set; }
public string NodeName { get; set; }
public List<string> IdList { get; set; } = new List<string>();
}

10
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobPauseEventData.cs

@ -0,0 +1,10 @@
using System;
using Volo.Abp.EventBus;
namespace LINGYUN.Abp.BackgroundTasks.EventBus;
[Serializable]
[EventName("abp.background-tasks.job.pause")]
public class JobPauseEventData : JobEventData
{
}

10
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobResumeEventData.cs

@ -0,0 +1,10 @@
using System;
using Volo.Abp.EventBus;
namespace LINGYUN.Abp.BackgroundTasks.EventBus;
[Serializable]
[EventName("abp.background-tasks.job.resume")]
public class JobResumeEventData : JobEventData
{
}

10
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobStartEventData.cs

@ -0,0 +1,10 @@
using System;
using Volo.Abp.EventBus;
namespace LINGYUN.Abp.BackgroundTasks.EventBus;
[Serializable]
[EventName("abp.background-tasks.job.start")]
public class JobStartEventData : JobEventData
{
}

10
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobStopEventData.cs

@ -0,0 +1,10 @@
using System;
using Volo.Abp.EventBus;
namespace LINGYUN.Abp.BackgroundTasks.EventBus;
[Serializable]
[EventName("abp.background-tasks.job.stop")]
public class JobStopEventData : JobEventData
{
}

118
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobSynchronizer.cs

@ -0,0 +1,118 @@
using Microsoft.Extensions.Options;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
namespace LINGYUN.Abp.BackgroundTasks.EventBus;
public class JobSynchronizer :
IDistributedEventHandler<JobStartEventData>,
IDistributedEventHandler<JobStopEventData>,
IDistributedEventHandler<JobTriggerEventData>,
IDistributedEventHandler<JobPauseEventData>,
IDistributedEventHandler<JobResumeEventData>,
ITransientDependency
{
protected IJobStore JobStore { get; }
protected IJobScheduler JobScheduler { get; }
protected AbpBackgroundTasksOptions BackgroundTasksOptions { get; }
public JobSynchronizer(
IJobStore jobStore,
IJobScheduler jobScheduler,
IOptions<AbpBackgroundTasksOptions> options)
{
JobStore = jobStore;
JobScheduler = jobScheduler;
BackgroundTasksOptions = options.Value;
}
public async virtual Task HandleEventAsync(JobStartEventData eventData)
{
if (string.Equals(eventData.NodeName, BackgroundTasksOptions.NodeName))
{
foreach (var jobId in eventData.IdList)
{
var jobInfo = await JobStore.FindAsync(jobId);
if (jobInfo == null)
{
continue;
}
await JobScheduler.QueueAsync(jobInfo);
}
}
}
public async virtual Task HandleEventAsync(JobStopEventData eventData)
{
if (string.Equals(eventData.NodeName, BackgroundTasksOptions.NodeName))
{
foreach (var jobId in eventData.IdList)
{
var jobInfo = await JobStore.FindAsync(jobId);
if (jobInfo == null)
{
continue;
}
await JobScheduler.RemoveAsync(jobInfo);
}
}
}
public async virtual Task HandleEventAsync(JobTriggerEventData eventData)
{
if (string.Equals(eventData.NodeName, BackgroundTasksOptions.NodeName))
{
foreach (var jobId in eventData.IdList)
{
var jobInfo = await JobStore.FindAsync(jobId);
if (jobInfo == null)
{
continue;
}
await JobScheduler.TriggerAsync(jobInfo);
}
}
}
public async virtual Task HandleEventAsync(JobPauseEventData eventData)
{
if (string.Equals(eventData.NodeName, BackgroundTasksOptions.NodeName))
{
foreach (var jobId in eventData.IdList)
{
var jobInfo = await JobStore.FindAsync(jobId);
if (jobInfo == null)
{
continue;
}
await JobScheduler.PauseAsync(jobInfo);
}
}
}
public async virtual Task HandleEventAsync(JobResumeEventData eventData)
{
if (string.Equals(eventData.NodeName, BackgroundTasksOptions.NodeName))
{
foreach (var jobId in eventData.IdList)
{
var jobInfo = await JobStore.FindAsync(jobId);
if (jobInfo == null)
{
continue;
}
await JobScheduler.ResumeAsync(jobInfo);
}
}
}
}

11
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.EventBus/LINGYUN/Abp/BackgroundTasks/EventBus/JobTriggerEventData.cs

@ -0,0 +1,11 @@
using System;
using Volo.Abp.EventBus;
namespace LINGYUN.Abp.BackgroundTasks.EventBus;
[Serializable]
[EventName("abp.background-tasks.job.trigger")]
public class JobTriggerEventData : JobEventData
{
}

19
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain.Shared/LINGYUN/Abp/TaskManagement/BackgroundJobEto.cs

@ -0,0 +1,19 @@
using LINGYUN.Abp.BackgroundTasks;
using System;
using Volo.Abp.EventBus;
using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.TaskManagement;
[Serializable]
[EventName("abp.tkm.background-job")]
public class BackgroundJobEto : IMultiTenant
{
public string Id { get; set; }
public Guid? TenantId { get; set; }
public bool IsEnabled { get; set; }
public string Name { get; set; }
public string Group { get; set; }
public string NodeName { get; set; }
public JobStatus Status { get; set; }
}

1
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN.Abp.TaskManagement.Domain.csproj

@ -14,6 +14,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\LINGYUN.Abp.BackgroundTasks.EventBus\LINGYUN.Abp.BackgroundTasks.EventBus.csproj" />
<ProjectReference Include="..\LINGYUN.Abp.BackgroundTasks\LINGYUN.Abp.BackgroundTasks.csproj" /> <ProjectReference Include="..\LINGYUN.Abp.BackgroundTasks\LINGYUN.Abp.BackgroundTasks.csproj" />
<ProjectReference Include="..\LINGYUN.Abp.TaskManagement.Domain.Shared\LINGYUN.Abp.TaskManagement.Domain.Shared.csproj" /> <ProjectReference Include="..\LINGYUN.Abp.TaskManagement.Domain.Shared\LINGYUN.Abp.TaskManagement.Domain.Shared.csproj" />
</ItemGroup> </ItemGroup>

185
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobManager.cs

@ -1,27 +1,26 @@
using LINGYUN.Abp.BackgroundTasks; using LINGYUN.Abp.BackgroundTasks;
using LINGYUN.Abp.BackgroundTasks.EventBus;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Domain.Services; using Volo.Abp.Domain.Services;
using Volo.Abp.ObjectMapping; using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Uow; using Volo.Abp.Uow;
namespace LINGYUN.Abp.TaskManagement; namespace LINGYUN.Abp.TaskManagement;
public class BackgroundJobManager : DomainService public class BackgroundJobManager : DomainService
{ {
protected IObjectMapper ObjectMapper { get; } protected IDistributedEventBus EventBus { get; }
protected IJobScheduler JobScheduler { get; }
protected IUnitOfWorkManager UnitOfWorkManager { get; } protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IBackgroundJobInfoRepository BackgroundJobInfoRepository { get; } protected IBackgroundJobInfoRepository BackgroundJobInfoRepository { get; }
public BackgroundJobManager( public BackgroundJobManager(
IObjectMapper objectMapper, IDistributedEventBus eventBus,
IJobScheduler jobScheduler,
IUnitOfWorkManager unitOfWorkManager, IUnitOfWorkManager unitOfWorkManager,
IBackgroundJobInfoRepository backgroundJobInfoRepository) IBackgroundJobInfoRepository backgroundJobInfoRepository)
{ {
ObjectMapper = objectMapper; EventBus = eventBus;
JobScheduler = jobScheduler;
UnitOfWorkManager = unitOfWorkManager; UnitOfWorkManager = unitOfWorkManager;
BackgroundJobInfoRepository = backgroundJobInfoRepository; BackgroundJobInfoRepository = backgroundJobInfoRepository;
} }
@ -30,15 +29,6 @@ public class BackgroundJobManager : DomainService
{ {
await BackgroundJobInfoRepository.InsertAsync(jobInfo); await BackgroundJobInfoRepository.InsertAsync(jobInfo);
if (jobInfo.IsEnabled && jobInfo.JobType == JobType.Period)
{
var job = ObjectMapper.Map<BackgroundJobInfo, JobInfo>(jobInfo);
UnitOfWorkManager.Current.OnCompleted(async () =>
{
await JobScheduler.QueueAsync(job);
});
}
return jobInfo; return jobInfo;
} }
@ -50,8 +40,13 @@ public class BackgroundJobManager : DomainService
{ {
UnitOfWorkManager.Current.OnCompleted(async () => UnitOfWorkManager.Current.OnCompleted(async () =>
{ {
var job = ObjectMapper.Map<BackgroundJobInfo, JobInfo>(jobInfo); await EventBus.PublishAsync(
await JobScheduler.RemoveAsync(job); new JobStopEventData
{
IdList = new List<string> { jobInfo.Id },
TenantId = jobInfo.TenantId,
NodeName = jobInfo.NodeName
});
}); });
} }
@ -59,7 +54,13 @@ public class BackgroundJobManager : DomainService
{ {
UnitOfWorkManager.Current.OnCompleted(async () => UnitOfWorkManager.Current.OnCompleted(async () =>
{ {
await QueueAsync(jobInfo); await EventBus.PublishAsync(
new JobStartEventData
{
IdList = new List<string> { jobInfo.Id },
TenantId = jobInfo.TenantId,
NodeName = jobInfo.NodeName
});
}); });
} }
@ -69,107 +70,185 @@ public class BackgroundJobManager : DomainService
public virtual async Task DeleteAsync(BackgroundJobInfo jobInfo) public virtual async Task DeleteAsync(BackgroundJobInfo jobInfo)
{ {
await BackgroundJobInfoRepository.DeleteAsync(jobInfo); await BackgroundJobInfoRepository.DeleteAsync(jobInfo);
UnitOfWorkManager.Current.OnCompleted(async () =>
{
var job = ObjectMapper.Map<BackgroundJobInfo, JobInfo>(jobInfo);
await JobScheduler.RemoveAsync(job);
});
} }
public virtual async Task BulkDeleteAsync(IEnumerable<BackgroundJobInfo> jobInfos) public virtual async Task BulkDeleteAsync(IEnumerable<BackgroundJobInfo> jobInfos)
{ {
foreach (var jobInfo in jobInfos) await BackgroundJobInfoRepository.DeleteManyAsync(jobInfos);
{
await DeleteAsync(jobInfo);
}
} }
public virtual async Task QueueAsync(BackgroundJobInfo jobInfo) public virtual async Task QueueAsync(BackgroundJobInfo jobInfo)
{ {
var job = ObjectMapper.Map<BackgroundJobInfo, JobInfo>(jobInfo); await EventBus.PublishAsync(
await JobScheduler.QueueAsync(job); new JobStartEventData
{
IdList = new List<string> { jobInfo.Id },
TenantId = jobInfo.TenantId,
NodeName = jobInfo.NodeName
});
} }
public virtual async Task BulkQueueAsync(IEnumerable<BackgroundJobInfo> jobInfos) public virtual async Task BulkQueueAsync(IEnumerable<BackgroundJobInfo> jobInfos)
{ {
var jobs = ObjectMapper.Map<IEnumerable<BackgroundJobInfo>, List<JobInfo>>(jobInfos); if (jobInfos.Any())
await JobScheduler.QueuesAsync(jobs); {
await EventBus.PublishAsync(
new JobStartEventData
{
IdList = jobInfos.Select(x => x.Id).ToList(),
TenantId = jobInfos.Select(x => x.TenantId).First(),
NodeName = jobInfos.Select(x => x.NodeName).First()
});
}
} }
public virtual async Task TriggerAsync(BackgroundJobInfo jobInfo) public virtual async Task TriggerAsync(BackgroundJobInfo jobInfo)
{ {
var job = ObjectMapper.Map<BackgroundJobInfo, JobInfo>(jobInfo); await EventBus.PublishAsync(
job.JobType = JobType.Once; new JobTriggerEventData
// 延迟两秒触发 {
job.Interval = 2; IdList = new List<string> { jobInfo.Id },
TenantId = jobInfo.TenantId,
await JobScheduler.TriggerAsync(job); NodeName = jobInfo.NodeName
});
} }
public virtual async Task BulkTriggerAsync(IEnumerable<BackgroundJobInfo> jobInfos) public virtual async Task BulkTriggerAsync(IEnumerable<BackgroundJobInfo> jobInfos)
{ {
foreach (var jobInfo in jobInfos) if (jobInfos.Any())
{ {
await TriggerAsync(jobInfo); await EventBus.PublishAsync(
new JobTriggerEventData
{
IdList = jobInfos.Select(x => x.Id).ToList(),
TenantId = jobInfos.Select(x => x.TenantId).First(),
NodeName = jobInfos.Select(x => x.NodeName).First()
});
} }
} }
public virtual async Task PauseAsync(BackgroundJobInfo jobInfo) public virtual async Task PauseAsync(BackgroundJobInfo jobInfo)
{ {
var job = ObjectMapper.Map<BackgroundJobInfo, JobInfo>(jobInfo);
await JobScheduler.PauseAsync(job);
jobInfo.SetStatus(JobStatus.Paused); jobInfo.SetStatus(JobStatus.Paused);
jobInfo.SetNextRunTime(null); jobInfo.SetNextRunTime(null);
await BackgroundJobInfoRepository.UpdateAsync(jobInfo); await BackgroundJobInfoRepository.UpdateAsync(jobInfo);
UnitOfWorkManager.Current.OnCompleted(async () =>
{
await EventBus.PublishAsync(
new JobPauseEventData
{
IdList = new List<string> { jobInfo.Id },
TenantId = jobInfo.TenantId,
NodeName = jobInfo.NodeName
});
});
} }
public virtual async Task BulkPauseAsync(IEnumerable<BackgroundJobInfo> jobInfos) public virtual async Task BulkPauseAsync(IEnumerable<BackgroundJobInfo> jobInfos)
{ {
foreach (var jobInfo in jobInfos) foreach (var jobInfo in jobInfos)
{ {
await PauseAsync(jobInfo); jobInfo.SetStatus(JobStatus.Paused);
jobInfo.SetNextRunTime(null);
} }
await BackgroundJobInfoRepository.UpdateManyAsync(jobInfos);
UnitOfWorkManager.Current.OnCompleted(async () =>
{
await EventBus.PublishAsync(
new JobPauseEventData
{
IdList = jobInfos.Select(x => x.Id).ToList(),
TenantId = jobInfos.Select(x => x.TenantId).First(),
NodeName = jobInfos.Select(x => x.NodeName).First()
});
});
} }
public virtual async Task ResumeAsync(BackgroundJobInfo jobInfo) public virtual async Task ResumeAsync(BackgroundJobInfo jobInfo)
{ {
var job = ObjectMapper.Map<BackgroundJobInfo, JobInfo>(jobInfo);
await JobScheduler.ResumeAsync(job);
jobInfo.SetStatus(JobStatus.Running); jobInfo.SetStatus(JobStatus.Running);
jobInfo.IsAbandoned = false; jobInfo.IsAbandoned = false;
jobInfo.IsEnabled = true; jobInfo.IsEnabled = true;
await BackgroundJobInfoRepository.UpdateAsync(jobInfo); await BackgroundJobInfoRepository.UpdateAsync(jobInfo);
UnitOfWorkManager.Current.OnCompleted(async () =>
{
await EventBus.PublishAsync(
new JobResumeEventData
{
IdList = new List<string> { jobInfo.Id },
TenantId = jobInfo.TenantId,
NodeName = jobInfo.NodeName
});
});
} }
public virtual async Task BulkResumeAsync(IEnumerable<BackgroundJobInfo> jobInfos) public virtual async Task BulkResumeAsync(IEnumerable<BackgroundJobInfo> jobInfos)
{ {
foreach (var jobInfo in jobInfos) foreach (var jobInfo in jobInfos)
{ {
await ResumeAsync(jobInfo); jobInfo.SetStatus(JobStatus.Running);
jobInfo.IsAbandoned = false;
jobInfo.IsEnabled = true;
} }
await BackgroundJobInfoRepository.UpdateManyAsync(jobInfos);
UnitOfWorkManager.Current.OnCompleted(async () =>
{
await EventBus.PublishAsync(
new JobResumeEventData
{
IdList = jobInfos.Select(x => x.Id).ToList(),
TenantId = jobInfos.Select(x => x.TenantId).First(),
NodeName = jobInfos.Select(x => x.NodeName).First()
});
});
} }
public virtual async Task StopAsync(BackgroundJobInfo jobInfo) public virtual async Task StopAsync(BackgroundJobInfo jobInfo)
{ {
var job = ObjectMapper.Map<BackgroundJobInfo, JobInfo>(jobInfo);
await JobScheduler.RemoveAsync(job);
jobInfo.SetStatus(JobStatus.Stopped); jobInfo.SetStatus(JobStatus.Stopped);
jobInfo.SetNextRunTime(null); jobInfo.SetNextRunTime(null);
await BackgroundJobInfoRepository.UpdateAsync(jobInfo); await BackgroundJobInfoRepository.UpdateAsync(jobInfo);
UnitOfWorkManager.Current.OnCompleted(async () =>
{
await EventBus.PublishAsync(
new JobStopEventData
{
IdList = new List<string> { jobInfo.Id },
TenantId = jobInfo.TenantId,
NodeName = jobInfo.NodeName
});
});
} }
public virtual async Task BulkStopAsync(IEnumerable<BackgroundJobInfo> jobInfos) public virtual async Task BulkStopAsync(IEnumerable<BackgroundJobInfo> jobInfos)
{ {
foreach (var jobInfo in jobInfos) foreach (var jobInfo in jobInfos)
{ {
await StopAsync(jobInfo); jobInfo.SetStatus(JobStatus.Stopped);
jobInfo.SetNextRunTime(null);
} }
await BackgroundJobInfoRepository.UpdateManyAsync(jobInfos);
UnitOfWorkManager.Current.OnCompleted(async () =>
{
await EventBus.PublishAsync(
new JobStopEventData
{
IdList = jobInfos.Select(x => x.Id).ToList(),
TenantId = jobInfos.Select(x => x.TenantId).First(),
NodeName = jobInfos.Select(x => x.NodeName).First()
});
});
} }
} }

60
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/BackgroundJobSynchronizer.cs

@ -0,0 +1,60 @@
using LINGYUN.Abp.BackgroundTasks;
using Microsoft.Extensions.Options;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.EventBus.Distributed;
namespace LINGYUN.Abp.TaskManagement;
public class BackgroundJobSynchronizer :
IDistributedEventHandler<EntityCreatedEto<BackgroundJobEto>>,
IDistributedEventHandler<EntityDeletedEto<BackgroundJobEto>>,
ITransientDependency
{
protected IJobStore JobStore { get; }
protected IJobScheduler JobScheduler { get; }
protected AbpBackgroundTasksOptions BackgroundTasksOptions { get; }
public BackgroundJobSynchronizer(
IJobStore jobStore,
IJobScheduler jobScheduler,
IOptions<AbpBackgroundTasksOptions> options)
{
JobStore = jobStore;
JobScheduler = jobScheduler;
BackgroundTasksOptions = options.Value;
}
public async virtual Task HandleEventAsync(EntityDeletedEto<BackgroundJobEto> eventData)
{
if (string.Equals(eventData.Entity.NodeName, BackgroundTasksOptions.NodeName))
{
var jobInfo = await JobStore.FindAsync(eventData.Entity.Id);
if (jobInfo == null)
{
return;
}
await JobScheduler.RemoveAsync(jobInfo);
}
}
public async virtual Task HandleEventAsync(EntityCreatedEto<BackgroundJobEto> eventData)
{
if (string.Equals(eventData.Entity.NodeName, BackgroundTasksOptions.NodeName))
{
var jobInfo = await JobStore.FindAsync(eventData.Entity.Id);
if (jobInfo == null)
{
return;
}
if (eventData.Entity.IsEnabled && jobInfo.JobType == JobType.Period)
{
await JobScheduler.QueueAsync(jobInfo);
}
}
}
}

1
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/TaskManagementDomainMapperProfile.cs

@ -8,5 +8,6 @@ public class TaskManagementDomainMapperProfile : Profile
public TaskManagementDomainMapperProfile() public TaskManagementDomainMapperProfile()
{ {
CreateMap<BackgroundJobInfo, JobInfo>(); CreateMap<BackgroundJobInfo, JobInfo>();
CreateMap<BackgroundJobInfo, BackgroundJobEto>();
} }
} }

11
aspnet-core/modules/task-management/LINGYUN.Abp.TaskManagement.Domain/LINGYUN/Abp/TaskManagement/TaskManagementDomainModule.cs

@ -1,7 +1,9 @@
using LINGYUN.Abp.BackgroundTasks; using LINGYUN.Abp.BackgroundTasks;
using LINGYUN.Abp.BackgroundTasks.EventBus;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.AutoMapper; using Volo.Abp.AutoMapper;
using Volo.Abp.Domain; using Volo.Abp.Domain;
using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
namespace LINGYUN.Abp.TaskManagement; namespace LINGYUN.Abp.TaskManagement;
@ -10,6 +12,7 @@ namespace LINGYUN.Abp.TaskManagement;
[DependsOn(typeof(AbpAutoMapperModule))] [DependsOn(typeof(AbpAutoMapperModule))]
[DependsOn(typeof(AbpDddDomainModule))] [DependsOn(typeof(AbpDddDomainModule))]
[DependsOn(typeof(AbpBackgroundTasksModule))] [DependsOn(typeof(AbpBackgroundTasksModule))]
[DependsOn(typeof(AbpBackgroundTasksEventBusModule))]
public class TaskManagementDomainModule : AbpModule public class TaskManagementDomainModule : AbpModule
{ {
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
@ -20,5 +23,13 @@ public class TaskManagementDomainModule : AbpModule
{ {
options.AddProfile<TaskManagementDomainMapperProfile>(validate: true); options.AddProfile<TaskManagementDomainMapperProfile>(validate: true);
}); });
Configure<AbpDistributedEntityEventOptions>(options =>
{
options.EtoMappings.Add<BackgroundJobInfo, BackgroundJobEto>(typeof(TaskManagementDomainModule));
options.AutoEventSelectors.Add<BackgroundJobInfo>();
});
} }
} }

5
aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain.Shared/LINGYUN/Abp/WebhooksManagement/WebhookEventEto.cs

@ -1,10 +1,11 @@
using System; using System;
using Volo.Abp.MultiTenancy; using Volo.Abp.EventBus;
namespace LINGYUN.Abp.WebhooksManagement; namespace LINGYUN.Abp.WebhooksManagement;
[Serializable] [Serializable]
public class WebhookEventEto : IMultiTenant [EventName("abp.webhooks.event")]
public class WebhookEventEto
{ {
public Guid Id { get; set; } public Guid Id { get; set; }
public Guid? TenantId { get; set; } public Guid? TenantId { get; set; }

2
aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain.Shared/LINGYUN/Abp/WebhooksManagement/WebhookSendAttemptEto.cs

@ -1,9 +1,11 @@
using System; using System;
using Volo.Abp.EventBus;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.WebhooksManagement; namespace LINGYUN.Abp.WebhooksManagement;
[Serializable] [Serializable]
[EventName("abp.webhooks.send-attempt")]
public class WebhookSendAttemptEto : IMultiTenant public class WebhookSendAttemptEto : IMultiTenant
{ {
public Guid Id { get; set; } public Guid Id { get; set; }

2
aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain.Shared/LINGYUN/Abp/WebhooksManagement/WebhookSubscriptionEto.cs

@ -1,9 +1,11 @@
using System; using System;
using Volo.Abp.EventBus;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.WebhooksManagement; namespace LINGYUN.Abp.WebhooksManagement;
[Serializable] [Serializable]
[EventName("abp.webhooks.subscription")]
public class WebhookSubscriptionEto : IMultiTenant public class WebhookSubscriptionEto : IMultiTenant
{ {
public Guid Id { get; set; } public Guid Id { get; set; }

2
aspnet-core/services/LY.MicroService.TaskManagement.HttpApi.Host/TaskManagementHttpApiHostModule.cs

@ -93,7 +93,7 @@ public partial class TaskManagementHttpApiHostModule : AbpModule
ConfigureSecurity(context.Services, configuration, hostingEnvironment.IsDevelopment()); ConfigureSecurity(context.Services, configuration, hostingEnvironment.IsDevelopment());
// 开发取消权限检查 // 开发取消权限检查
// context.Services.AddAlwaysAllowAuthorization(); context.Services.AddAlwaysAllowAuthorization();
} }
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override void OnApplicationInitialization(ApplicationInitializationContext context)

Loading…
Cancel
Save