committed by
GitHub
60 changed files with 2484 additions and 1 deletions
@ -0,0 +1,12 @@ |
|||||
|
<Project Sdk="Microsoft.NET.Sdk"> |
||||
|
|
||||
|
<PropertyGroup> |
||||
|
<TargetFramework>netstandard2.0</TargetFramework> |
||||
|
<RootNamespace /> |
||||
|
</PropertyGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<ProjectReference Include="..\..\elasticsearch\LINGYUN.Abp.Elasticsearch\LINGYUN.Abp.Elasticsearch.csproj" /> |
||||
|
<ProjectReference Include="..\LINGYUN.Abp.WorkflowCore\LINGYUN.Abp.WorkflowCore.csproj" /> |
||||
|
</ItemGroup> |
||||
|
</Project> |
||||
@ -0,0 +1,140 @@ |
|||||
|
using LINGYUN.Abp.Elasticsearch; |
||||
|
using LINGYUN.Abp.WorkflowCore.Elasticsearch.Models; |
||||
|
using Microsoft.Extensions.Logging; |
||||
|
using Microsoft.Extensions.Options; |
||||
|
using Nest; |
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using System.Linq.Expressions; |
||||
|
using System.Threading.Tasks; |
||||
|
using WorkflowCore.Interface; |
||||
|
using WorkflowCore.Models; |
||||
|
using WorkflowCore.Models.Search; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Elasticsearch |
||||
|
{ |
||||
|
public class AbpElasticsearchIndexer : ISearchIndex |
||||
|
{ |
||||
|
private IElasticClient _client; |
||||
|
|
||||
|
private readonly IElasticsearchClientFactory _elasticsearchClientFactory; |
||||
|
private readonly AbpWorkflowCoreElasticsearchOptions _options; |
||||
|
private readonly ILogger<AbpElasticsearchIndexer> _logger; |
||||
|
|
||||
|
public AbpElasticsearchIndexer( |
||||
|
ILogger<AbpElasticsearchIndexer> logger, |
||||
|
IOptions<AbpWorkflowCoreElasticsearchOptions> options, |
||||
|
IElasticsearchClientFactory elasticsearchClientFactory) |
||||
|
{ |
||||
|
_logger = logger; |
||||
|
_options = options.Value; |
||||
|
_elasticsearchClientFactory = elasticsearchClientFactory; |
||||
|
} |
||||
|
|
||||
|
public async Task IndexWorkflow(WorkflowInstance workflow) |
||||
|
{ |
||||
|
if (_client == null) |
||||
|
throw new InvalidOperationException("Not started"); |
||||
|
|
||||
|
var denormModel = WorkflowSearchModel.FromWorkflowInstance(workflow); |
||||
|
|
||||
|
var result = await _client.IndexAsync( |
||||
|
denormModel, |
||||
|
idx => idx.Index(_options.IndexFormat)); |
||||
|
|
||||
|
if (!result.ApiCall.Success) |
||||
|
{ |
||||
|
_logger.LogError(default(EventId), result.ApiCall.OriginalException, $"Failed to index workflow {workflow.Id}"); |
||||
|
throw new ApplicationException($"Failed to index workflow {workflow.Id}", result.ApiCall.OriginalException); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public async Task<Page<WorkflowSearchResult>> Search(string terms, int skip, int take, params SearchFilter[] filters) |
||||
|
{ |
||||
|
if (_client == null) |
||||
|
throw new InvalidOperationException("Not started"); |
||||
|
|
||||
|
var result = await _client.SearchAsync<WorkflowSearchModel>(s => s |
||||
|
.Index(_options.IndexFormat) |
||||
|
.Skip(skip) |
||||
|
.Take(take) |
||||
|
.MinScore(!string.IsNullOrEmpty(terms) ? 0.1 : 0) |
||||
|
.Query(query => query |
||||
|
.Bool(b => b |
||||
|
.Filter(BuildFilterQuery(filters)) |
||||
|
.Should( |
||||
|
should => should.Match(t => t.Field(f => f.Reference).Query(terms).Boost(1.2)), |
||||
|
should => should.Match(t => t.Field(f => f.DataTokens).Query(terms).Boost(1.1)), |
||||
|
should => should.Match(t => t.Field(f => f.WorkflowDefinitionId).Query(terms).Boost(0.9)), |
||||
|
should => should.Match(t => t.Field(f => f.Status).Query(terms).Boost(0.9)), |
||||
|
should => should.Match(t => t.Field(f => f.Description).Query(terms)) |
||||
|
) |
||||
|
) |
||||
|
) |
||||
|
); |
||||
|
|
||||
|
return new Page<WorkflowSearchResult> |
||||
|
{ |
||||
|
Total = result.Total, |
||||
|
Data = result.Hits.Select(x => x.Source).Select(x => x.ToSearchResult()).ToList() |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
public async Task Start() |
||||
|
{ |
||||
|
_client = _elasticsearchClientFactory.Create(); |
||||
|
var nodeInfo = await _client.Nodes.InfoAsync(); |
||||
|
if (nodeInfo.Nodes.Values.Any(x => Convert.ToUInt32(x.Version.Split('.')[0]) < 6)) |
||||
|
throw new NotSupportedException("Elasticsearch verison 6 or greater is required"); |
||||
|
|
||||
|
var exists = await _client.Indices.ExistsAsync(_options.IndexFormat); |
||||
|
if (!exists.Exists) |
||||
|
{ |
||||
|
await _client.Indices.CreateAsync(_options.IndexFormat); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public Task Stop() |
||||
|
{ |
||||
|
return Task.CompletedTask; |
||||
|
} |
||||
|
|
||||
|
private List<Func<QueryContainerDescriptor<WorkflowSearchModel>, QueryContainer>> BuildFilterQuery(SearchFilter[] filters) |
||||
|
{ |
||||
|
var result = new List<Func<QueryContainerDescriptor<WorkflowSearchModel>, QueryContainer>>(); |
||||
|
|
||||
|
foreach (var filter in filters) |
||||
|
{ |
||||
|
var field = new Field(filter.Property); |
||||
|
if (filter.IsData) |
||||
|
{ |
||||
|
Expression<Func<WorkflowSearchModel, object>> dataExpr = x => x.Data[filter.DataType.FullName]; |
||||
|
var fieldExpr = Expression.Convert(filter.Property, typeof(Func<object, object>)); |
||||
|
field = new Field(Expression.Lambda(Expression.Invoke(fieldExpr, dataExpr), Expression.Parameter(typeof(WorkflowSearchModel)))); |
||||
|
} |
||||
|
|
||||
|
switch (filter) |
||||
|
{ |
||||
|
case ScalarFilter f: |
||||
|
result.Add(x => x.Match(t => t.Field(field).Query(Convert.ToString(f.Value)))); |
||||
|
break; |
||||
|
case DateRangeFilter f: |
||||
|
if (f.BeforeValue.HasValue) |
||||
|
result.Add(x => x.DateRange(t => t.Field(field).LessThan(f.BeforeValue))); |
||||
|
if (f.AfterValue.HasValue) |
||||
|
result.Add(x => x.DateRange(t => t.Field(field).GreaterThan(f.AfterValue))); |
||||
|
break; |
||||
|
case NumericRangeFilter f: |
||||
|
if (f.LessValue.HasValue) |
||||
|
result.Add(x => x.Range(t => t.Field(field).LessThan(f.LessValue))); |
||||
|
if (f.GreaterValue.HasValue) |
||||
|
result.Add(x => x.Range(t => t.Field(field).GreaterThan(f.GreaterValue))); |
||||
|
break; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return result; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,24 @@ |
|||||
|
using LINGYUN.Abp.Elasticsearch; |
||||
|
using Microsoft.Extensions.DependencyInjection; |
||||
|
using Volo.Abp.Modularity; |
||||
|
using WorkflowCore.Interface; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Elasticsearch |
||||
|
{ |
||||
|
[DependsOn(typeof(AbpWorkflowCoreModule))] |
||||
|
[DependsOn(typeof(AbpElasticsearchModule))] |
||||
|
public class AbpWorkflowCoreElasticsearchModule : AbpModule |
||||
|
{ |
||||
|
public override void PreConfigureServices(ServiceConfigurationContext context) |
||||
|
{ |
||||
|
context.Services.AddSingleton<ISearchIndex, AbpElasticsearchIndexer>(); |
||||
|
context.Services.AddSingleton<AbpElasticsearchIndexer>(); |
||||
|
|
||||
|
PreConfigure<WorkflowOptions>(options => |
||||
|
{ |
||||
|
options.UseSearchIndex(provider => provider.GetRequiredService<AbpElasticsearchIndexer>()); |
||||
|
}); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,14 @@ |
|||||
|
namespace LINGYUN.Abp.WorkflowCore.Elasticsearch |
||||
|
{ |
||||
|
public class AbpWorkflowCoreElasticsearchOptions |
||||
|
{ |
||||
|
/// <summary>
|
||||
|
/// Default value: "workflows".
|
||||
|
/// </summary>
|
||||
|
public string IndexFormat { get; set; } |
||||
|
public AbpWorkflowCoreElasticsearchOptions() |
||||
|
{ |
||||
|
IndexFormat = "workflows"; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,120 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using WorkflowCore.Interface; |
||||
|
using WorkflowCore.Models; |
||||
|
using WorkflowCore.Models.Search; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Elasticsearch.Models |
||||
|
{ |
||||
|
public class WorkflowSearchModel |
||||
|
{ |
||||
|
public string Id { get; set; } |
||||
|
|
||||
|
public string WorkflowDefinitionId { get; set; } |
||||
|
|
||||
|
public int Version { get; set; } |
||||
|
|
||||
|
public string Description { get; set; } |
||||
|
|
||||
|
public string Reference { get; set; } |
||||
|
|
||||
|
public DateTime? NextExecutionUtc { get; set; } |
||||
|
|
||||
|
public string Status { get; set; } |
||||
|
|
||||
|
public Dictionary<string, object> Data { get; set; } = new Dictionary<string, object>(); |
||||
|
|
||||
|
public IEnumerable<string> DataTokens { get; set; } |
||||
|
|
||||
|
public DateTime CreateTime { get; set; } |
||||
|
|
||||
|
public DateTime? CompleteTime { get; set; } |
||||
|
|
||||
|
public ICollection<StepInfo> WaitingSteps { get; set; } = new HashSet<StepInfo>(); |
||||
|
|
||||
|
public ICollection<StepInfo> SleepingSteps { get; set; } = new HashSet<StepInfo>(); |
||||
|
|
||||
|
public ICollection<StepInfo> FailedSteps { get; set; } = new HashSet<StepInfo>(); |
||||
|
|
||||
|
public WorkflowSearchResult ToSearchResult() |
||||
|
{ |
||||
|
var result = new WorkflowSearchResult |
||||
|
{ |
||||
|
Id = Id, |
||||
|
CompleteTime = CompleteTime, |
||||
|
CreateTime = CreateTime, |
||||
|
Description = Description, |
||||
|
NextExecutionUtc = NextExecutionUtc, |
||||
|
Reference = Reference, |
||||
|
Status = (WorkflowStatus)Enum.Parse(typeof(WorkflowStatus), Status, true), |
||||
|
Version = Version, |
||||
|
WorkflowDefinitionId = WorkflowDefinitionId, |
||||
|
FailedSteps = FailedSteps, |
||||
|
SleepingSteps = SleepingSteps, |
||||
|
WaitingSteps = WaitingSteps |
||||
|
}; |
||||
|
|
||||
|
if (Data.Count > 0) |
||||
|
result.Data = Data.First().Value; |
||||
|
|
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
public static WorkflowSearchModel FromWorkflowInstance(WorkflowInstance workflow) |
||||
|
{ |
||||
|
var result = new WorkflowSearchModel(); |
||||
|
|
||||
|
result.Id = workflow.Id; |
||||
|
result.WorkflowDefinitionId = workflow.WorkflowDefinitionId; |
||||
|
result.Description = workflow.Description; |
||||
|
result.Reference = workflow.Reference; |
||||
|
|
||||
|
if (workflow.Data != null) |
||||
|
result.Data.Add(workflow.Data.GetType().FullName, workflow.Data); |
||||
|
|
||||
|
result.CompleteTime = workflow.CompleteTime; |
||||
|
result.CreateTime = workflow.CreateTime; |
||||
|
result.Version = workflow.Version; |
||||
|
result.Status = workflow.Status.ToString(); |
||||
|
|
||||
|
if (workflow.NextExecution.HasValue) |
||||
|
result.NextExecutionUtc = new DateTime(workflow.NextExecution.Value); |
||||
|
|
||||
|
if (workflow.Data is ISearchable) |
||||
|
result.DataTokens = (workflow.Data as ISearchable).GetSearchTokens(); |
||||
|
|
||||
|
foreach (var ep in workflow.ExecutionPointers) |
||||
|
{ |
||||
|
if (ep.Status == PointerStatus.Sleeping) |
||||
|
{ |
||||
|
result.SleepingSteps.Add(new StepInfo |
||||
|
{ |
||||
|
StepId = ep.StepId, |
||||
|
Name = ep.StepName |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
if (ep.Status == PointerStatus.WaitingForEvent) |
||||
|
{ |
||||
|
result.WaitingSteps.Add(new StepInfo |
||||
|
{ |
||||
|
StepId = ep.StepId, |
||||
|
Name = ep.StepName |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
if (ep.Status == PointerStatus.Failed) |
||||
|
{ |
||||
|
result.FailedSteps.Add(new StepInfo |
||||
|
{ |
||||
|
StepId = ep.StepId, |
||||
|
Name = ep.StepName |
||||
|
}); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return result; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,16 @@ |
|||||
|
<Project Sdk="Microsoft.NET.Sdk"> |
||||
|
|
||||
|
<PropertyGroup> |
||||
|
<TargetFramework>netstandard2.0</TargetFramework> |
||||
|
<RootNamespace /> |
||||
|
</PropertyGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<PackageReference Include="Volo.Abp.EventBus" Version="4.4.0" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<ProjectReference Include="..\LINGYUN.Abp.WorkflowCore\LINGYUN.Abp.WorkflowCore.csproj" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
</Project> |
||||
@ -0,0 +1,86 @@ |
|||||
|
using Microsoft.Extensions.Logging; |
||||
|
using Newtonsoft.Json; |
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Threading.Tasks; |
||||
|
using Volo.Abp.EventBus.Distributed; |
||||
|
using Volo.Abp.Json; |
||||
|
using WorkflowCore.Interface; |
||||
|
using EventData = WorkflowCore.Models.LifeCycleEvents.LifeCycleEvent; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.LifeCycleEvent |
||||
|
{ |
||||
|
public class AbpEventBusProvider : ILifeCycleEventHub |
||||
|
{ |
||||
|
private bool _started = false; |
||||
|
private Queue<Action<EventData>> _deferredSubscribers = new Queue<Action<EventData>>(); |
||||
|
|
||||
|
private readonly IDistributedEventBus _eventBus; |
||||
|
private readonly ILoggerFactory _loggerFactory; |
||||
|
|
||||
|
private readonly JsonSerializerSettings _serializerSettings = new JsonSerializerSettings |
||||
|
{ |
||||
|
TypeNameHandling = TypeNameHandling.All, |
||||
|
ReferenceLoopHandling = ReferenceLoopHandling.Error, |
||||
|
}; |
||||
|
|
||||
|
public AbpEventBusProvider( |
||||
|
ILoggerFactory loggerFactory, |
||||
|
IDistributedEventBus distributedEventBus) |
||||
|
{ |
||||
|
_loggerFactory = loggerFactory; |
||||
|
_eventBus = distributedEventBus; |
||||
|
} |
||||
|
|
||||
|
public async Task PublishNotification(EventData evt) |
||||
|
{ |
||||
|
var data = evt.SerializeObject(_serializerSettings); |
||||
|
var wrapEvent = new LifeCycleEventWrap(data); |
||||
|
await _eventBus.PublishAsync(wrapEvent); |
||||
|
} |
||||
|
|
||||
|
public Task Start() |
||||
|
{ |
||||
|
_started = true; |
||||
|
while (_deferredSubscribers.Count > 0) |
||||
|
{ |
||||
|
var action = _deferredSubscribers.Dequeue(); |
||||
|
_eventBus.Subscribe<LifeCycleEventWrap>((data) => |
||||
|
{ |
||||
|
var unWrapData = data.Data.DeserializeObject(_serializerSettings); |
||||
|
action(unWrapData as EventData); |
||||
|
|
||||
|
return Task.CompletedTask; |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
return Task.CompletedTask; |
||||
|
} |
||||
|
|
||||
|
public Task Stop() |
||||
|
{ |
||||
|
// TODO
|
||||
|
_started = false; |
||||
|
|
||||
|
return Task.CompletedTask; |
||||
|
} |
||||
|
|
||||
|
public void Subscribe(Action<EventData> action) |
||||
|
{ |
||||
|
if (_started) |
||||
|
{ |
||||
|
_eventBus.Subscribe<LifeCycleEventWrap>((data) => |
||||
|
{ |
||||
|
var unWrapData = data.Data.DeserializeObject(_serializerSettings); |
||||
|
action(unWrapData as EventData); |
||||
|
|
||||
|
return Task.CompletedTask; |
||||
|
}); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
_deferredSubscribers.Enqueue(action); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,40 @@ |
|||||
|
using Microsoft.Extensions.DependencyInjection; |
||||
|
using Volo.Abp.EventBus; |
||||
|
using Volo.Abp.Json; |
||||
|
using Volo.Abp.Json.SystemTextJson; |
||||
|
using Volo.Abp.Modularity; |
||||
|
using WorkflowCore.Interface; |
||||
|
using WorkflowCore.Models; |
||||
|
using EventData = WorkflowCore.Models.LifeCycleEvents.LifeCycleEvent; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.LifeCycleEvent |
||||
|
{ |
||||
|
[DependsOn(typeof(AbpEventBusModule))] |
||||
|
[DependsOn(typeof(AbpWorkflowCoreModule))] |
||||
|
public class AbpWorkflowCoreLifeCycleEventModule : AbpModule |
||||
|
{ |
||||
|
public override void PreConfigureServices(ServiceConfigurationContext context) |
||||
|
{ |
||||
|
context.Services.AddSingleton<ILifeCycleEventHub, AbpEventBusProvider>(); |
||||
|
context.Services.AddSingleton<AbpEventBusProvider>(); |
||||
|
|
||||
|
PreConfigure<WorkflowOptions>(options => |
||||
|
{ |
||||
|
options.UseEventHub(provider => provider.GetRequiredService<AbpEventBusProvider>()); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
public override void ConfigureServices(ServiceConfigurationContext context) |
||||
|
{ |
||||
|
Configure<AbpJsonOptions>(options => |
||||
|
{ |
||||
|
options.UseHybridSerializer = true; |
||||
|
}); |
||||
|
|
||||
|
Configure<AbpSystemTextJsonSerializerOptions>(options => |
||||
|
{ |
||||
|
options.UnsupportedTypes.TryAdd<EventData>(); |
||||
|
}); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,12 @@ |
|||||
|
namespace LINGYUN.Abp.WorkflowCore.LifeCycleEvent |
||||
|
{ |
||||
|
public class LifeCycleEventWrap |
||||
|
{ |
||||
|
public string Data { get; set; } |
||||
|
public LifeCycleEventWrap() { } |
||||
|
public LifeCycleEventWrap(string data) |
||||
|
{ |
||||
|
Data = data; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,16 @@ |
|||||
|
<Project Sdk="Microsoft.NET.Sdk"> |
||||
|
|
||||
|
<PropertyGroup> |
||||
|
<TargetFramework>netstandard2.1</TargetFramework> |
||||
|
<RootNamespace /> |
||||
|
</PropertyGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<PackageReference Include="Volo.Abp.EntityFrameworkCore" Version="4.4.0" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<ProjectReference Include="..\LINGYUN.Abp.WorkflowCore.Persistence\LINGYUN.Abp.WorkflowCore.Persistence.csproj" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
</Project> |
||||
@ -0,0 +1,23 @@ |
|||||
|
using Microsoft.Extensions.DependencyInjection; |
||||
|
using Volo.Abp.EntityFrameworkCore; |
||||
|
using Volo.Abp.Modularity; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence.EntityFrameworkCore |
||||
|
{ |
||||
|
[DependsOn(typeof(AbpWorkflowCorePersistenceModule))] |
||||
|
[DependsOn(typeof(AbpEntityFrameworkCoreModule))] |
||||
|
public class AbpWorkflowCorePersistenceEntityFrameworkCoreModule : AbpModule |
||||
|
{ |
||||
|
public override void ConfigureServices(ServiceConfigurationContext context) |
||||
|
{ |
||||
|
context.Services.AddAbpDbContext<WorkflowDbContext>(options => |
||||
|
{ |
||||
|
options.AddRepository<Workflow, EfCoreWorkflowRepository>(); |
||||
|
options.AddRepository<WorkflowEvent, EfCoreWorkflowEventRepository>(); |
||||
|
options.AddRepository<WorkflowExecutionError, EfCoreWorkflowExecutionErrorRepository>(); |
||||
|
options.AddRepository<WorkflowScheduledCommand, EfCoreWorkflowScheduledCommandRepository>(); |
||||
|
options.AddRepository<WorkflowEventSubscription, EfCoreWorkflowEventSubscriptionRepository>(); |
||||
|
}); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,14 @@ |
|||||
|
using System; |
||||
|
using Volo.Abp.Domain.Repositories.EntityFrameworkCore; |
||||
|
using Volo.Abp.EntityFrameworkCore; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence.EntityFrameworkCore |
||||
|
{ |
||||
|
public class EfCoreWorkflowEventRepository : EfCoreRepository<WorkflowDbContext, WorkflowEvent, Guid>, IWorkflowEventRepository |
||||
|
{ |
||||
|
public EfCoreWorkflowEventRepository(IDbContextProvider<WorkflowDbContext> dbContextProvider) |
||||
|
: base(dbContextProvider) |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,15 @@ |
|||||
|
using LINGYUN.Abp.WorkflowCore.Persistence.EntityFrameworkCore; |
||||
|
using System; |
||||
|
using Volo.Abp.Domain.Repositories.EntityFrameworkCore; |
||||
|
using Volo.Abp.EntityFrameworkCore; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class EfCoreWorkflowEventSubscriptionRepository : EfCoreRepository<WorkflowDbContext, WorkflowEventSubscription, Guid>, IWorkflowEventSubscriptionRepository |
||||
|
{ |
||||
|
public EfCoreWorkflowEventSubscriptionRepository(IDbContextProvider<WorkflowDbContext> dbContextProvider) |
||||
|
: base(dbContextProvider) |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,14 @@ |
|||||
|
using LINGYUN.Abp.WorkflowCore.Persistence.EntityFrameworkCore; |
||||
|
using Volo.Abp.Domain.Repositories.EntityFrameworkCore; |
||||
|
using Volo.Abp.EntityFrameworkCore; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class EfCoreWorkflowExecutionErrorRepository : EfCoreRepository<WorkflowDbContext, WorkflowExecutionError, int>, IWorkflowExecutionErrorRepository |
||||
|
{ |
||||
|
public EfCoreWorkflowExecutionErrorRepository(IDbContextProvider<WorkflowDbContext> dbContextProvider) |
||||
|
: base(dbContextProvider) |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,40 @@ |
|||||
|
using LINGYUN.Abp.WorkflowCore.Persistence.EntityFrameworkCore; |
||||
|
using Microsoft.EntityFrameworkCore; |
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
using Volo.Abp.Domain.Repositories.EntityFrameworkCore; |
||||
|
using Volo.Abp.EntityFrameworkCore; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class EfCoreWorkflowRepository : EfCoreRepository<WorkflowDbContext, Workflow, Guid>, IWorkflowRepository |
||||
|
{ |
||||
|
public EfCoreWorkflowRepository(IDbContextProvider<WorkflowDbContext> dbContextProvider) |
||||
|
: base(dbContextProvider) |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<List<Workflow>> GetListAsync( |
||||
|
WorkflowStatus? status, |
||||
|
string type, |
||||
|
DateTime? createdFrom, |
||||
|
DateTime? createdTo, |
||||
|
int skip, |
||||
|
int take, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
return await (await GetDbSetAsync()) |
||||
|
.Include(x => x.ExecutionPointers) |
||||
|
.WhereIf(status.HasValue, x => x.Status == status.Value) |
||||
|
.WhereIf(!type.IsNullOrWhiteSpace(), x => x.WorkflowDefinitionId.Equals(type)) |
||||
|
.WhereIf(createdFrom.HasValue, x => x.CreationTime >= createdFrom.Value) |
||||
|
.WhereIf(createdTo.HasValue, x => x.CreationTime <= createdTo.Value) |
||||
|
.PageBy(skip, take) |
||||
|
.ToListAsync(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,14 @@ |
|||||
|
using LINGYUN.Abp.WorkflowCore.Persistence.EntityFrameworkCore; |
||||
|
using Volo.Abp.Domain.Repositories.EntityFrameworkCore; |
||||
|
using Volo.Abp.EntityFrameworkCore; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class EfCoreWorkflowScheduledCommandRepository : EfCoreRepository<WorkflowDbContext, WorkflowScheduledCommand, long>, IWorkflowScheduledCommandRepository |
||||
|
{ |
||||
|
public EfCoreWorkflowScheduledCommandRepository(IDbContextProvider<WorkflowDbContext> dbContextProvider) |
||||
|
: base(dbContextProvider) |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,18 @@ |
|||||
|
using Microsoft.EntityFrameworkCore; |
||||
|
using Volo.Abp.Data; |
||||
|
using Volo.Abp.EntityFrameworkCore; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence.EntityFrameworkCore |
||||
|
{ |
||||
|
[ConnectionStringName(WorkflowDbProperties.ConnectionStringName)] |
||||
|
public interface IWorkflowDbContext : IEfCoreDbContext |
||||
|
{ |
||||
|
DbSet<Workflow> Workflows { get; set; } |
||||
|
DbSet<WorkflowEvent> WorkflowEvents { get; set; } |
||||
|
DbSet<WorkflowEventSubscription> WorkflowEventSubscriptions { get; set; } |
||||
|
DbSet<WorkflowExecutionError> WorkflowExecutionErrors { get; set; } |
||||
|
DbSet<WorkflowExecutionPointer> WorkflowExecutionPointers { get; set; } |
||||
|
DbSet<WorkflowExtensionAttribute> WorkflowExtensionAttributes { get; set; } |
||||
|
DbSet<WorkflowScheduledCommand> WorkflowScheduledCommands { get; set; } |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,29 @@ |
|||||
|
using Microsoft.EntityFrameworkCore; |
||||
|
using Volo.Abp.Data; |
||||
|
using Volo.Abp.EntityFrameworkCore; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence.EntityFrameworkCore |
||||
|
{ |
||||
|
[ConnectionStringName(WorkflowDbProperties.ConnectionStringName)] |
||||
|
public class WorkflowDbContext : AbpDbContext<WorkflowDbContext>, IWorkflowDbContext |
||||
|
{ |
||||
|
public virtual DbSet<Workflow> Workflows { get; set; } |
||||
|
public virtual DbSet<WorkflowEvent> WorkflowEvents { get; set; } |
||||
|
public virtual DbSet<WorkflowEventSubscription> WorkflowEventSubscriptions { get; set; } |
||||
|
public virtual DbSet<WorkflowExecutionError> WorkflowExecutionErrors { get; set; } |
||||
|
public virtual DbSet<WorkflowExecutionPointer> WorkflowExecutionPointers { get; set; } |
||||
|
public virtual DbSet<WorkflowExtensionAttribute> WorkflowExtensionAttributes { get; set; } |
||||
|
public virtual DbSet<WorkflowScheduledCommand> WorkflowScheduledCommands { get; set; } |
||||
|
|
||||
|
public WorkflowDbContext(DbContextOptions<WorkflowDbContext> options) : base(options) |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
protected override void OnModelCreating(ModelBuilder modelBuilder) |
||||
|
{ |
||||
|
base.OnModelCreating(modelBuilder); |
||||
|
|
||||
|
modelBuilder.ConfigureWorkflow(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,103 @@ |
|||||
|
using JetBrains.Annotations; |
||||
|
using Microsoft.EntityFrameworkCore; |
||||
|
using Volo.Abp; |
||||
|
using Volo.Abp.EntityFrameworkCore.Modeling; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence.EntityFrameworkCore |
||||
|
{ |
||||
|
public static class WorkflowDbContextModelBuilderExtensions |
||||
|
{ |
||||
|
public static void ConfigureWorkflow([NotNull] this ModelBuilder builder) |
||||
|
{ |
||||
|
Check.NotNull(builder, nameof(builder)); |
||||
|
|
||||
|
builder.Entity<Workflow>(b => |
||||
|
{ |
||||
|
b.ToTable(WorkflowDbProperties.TablePrefix + "Workflow"); |
||||
|
|
||||
|
b.Property(p => p.WorkflowDefinitionId).HasMaxLength(200); |
||||
|
b.Property(p => p.Description).HasMaxLength(500); |
||||
|
b.Property(p => p.Reference).HasMaxLength(200); |
||||
|
|
||||
|
b.ConfigureByConvention(); |
||||
|
|
||||
|
b.HasIndex(p => p.NextExecution); |
||||
|
}); |
||||
|
|
||||
|
builder.Entity<WorkflowExecutionPointer>(b => |
||||
|
{ |
||||
|
b.ToTable(WorkflowDbProperties.TablePrefix + "ExecutionPointer"); |
||||
|
|
||||
|
b.Property(p => p.Id).HasMaxLength(50); |
||||
|
b.Property(p => p.EventName).HasMaxLength(200); |
||||
|
b.Property(p => p.EventKey).HasMaxLength(200); |
||||
|
b.Property(p => p.StepName).HasMaxLength(100); |
||||
|
b.Property(p => p.PredecessorId).HasMaxLength(100); |
||||
|
|
||||
|
b.ConfigureByConvention(); |
||||
|
}); |
||||
|
|
||||
|
builder.Entity<WorkflowExtensionAttribute>(b => |
||||
|
{ |
||||
|
b.ToTable(WorkflowDbProperties.TablePrefix + "ExtensionAttribute"); |
||||
|
|
||||
|
b.Property(p => p.Key).HasMaxLength(100); |
||||
|
b.Property(p => p.ExecutionPointerId).HasMaxLength(50); |
||||
|
|
||||
|
b.ConfigureByConvention(); |
||||
|
}); |
||||
|
|
||||
|
builder.Entity<WorkflowEvent>(b => |
||||
|
{ |
||||
|
b.ToTable(WorkflowDbProperties.TablePrefix + "Event"); |
||||
|
|
||||
|
b.Property(p => p.EventName).HasMaxLength(200); |
||||
|
b.Property(p => p.EventKey).HasMaxLength(200); |
||||
|
|
||||
|
b.ConfigureByConvention(); |
||||
|
|
||||
|
b.HasIndex(x => new { x.EventName, x.EventKey }); |
||||
|
b.HasIndex(x => x.CreationTime); |
||||
|
b.HasIndex(x => x.IsProcessed); |
||||
|
}); |
||||
|
|
||||
|
builder.Entity<WorkflowEventSubscription>(b => |
||||
|
{ |
||||
|
b.ToTable(WorkflowDbProperties.TablePrefix + "Subscription"); |
||||
|
|
||||
|
b.Property(p => p.ExecutionPointerId).HasMaxLength(50); |
||||
|
b.Property(p => p.EventName).HasMaxLength(200); |
||||
|
b.Property(p => p.EventKey).HasMaxLength(200); |
||||
|
b.Property(p => p.ExternalToken).HasMaxLength(200); |
||||
|
b.Property(p => p.ExternalWorkerId).HasMaxLength(200); |
||||
|
|
||||
|
b.ConfigureByConvention(); |
||||
|
|
||||
|
b.HasIndex(x => x.EventName); |
||||
|
b.HasIndex(x => x.EventKey); |
||||
|
}); |
||||
|
|
||||
|
builder.Entity<WorkflowExecutionError>(b => |
||||
|
{ |
||||
|
b.ToTable(WorkflowDbProperties.TablePrefix + "ExecutionError"); |
||||
|
|
||||
|
b.Property(p => p.ExecutionPointerId).HasMaxLength(50); |
||||
|
|
||||
|
b.ConfigureByConvention(); |
||||
|
}); |
||||
|
|
||||
|
builder.Entity<WorkflowScheduledCommand>(b => |
||||
|
{ |
||||
|
b.ToTable(WorkflowDbProperties.TablePrefix + "ScheduledCommand"); |
||||
|
|
||||
|
b.Property(p => p.CommandName).HasMaxLength(200); |
||||
|
b.Property(p => p.Data).HasMaxLength(500); |
||||
|
|
||||
|
b.ConfigureByConvention(); |
||||
|
|
||||
|
b.HasIndex(x => x.ExecuteTime); |
||||
|
b.HasIndex(x => new { x.CommandName, x.Data }).IsUnique(); |
||||
|
}); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,16 @@ |
|||||
|
<Project Sdk="Microsoft.NET.Sdk"> |
||||
|
|
||||
|
<PropertyGroup> |
||||
|
<TargetFramework>netstandard2.0</TargetFramework> |
||||
|
<RootNamespace /> |
||||
|
</PropertyGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="4.4.0" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<ProjectReference Include="..\LINGYUN.Abp.WorkflowCore\LINGYUN.Abp.WorkflowCore.csproj" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
</Project> |
||||
@ -0,0 +1,22 @@ |
|||||
|
using Microsoft.Extensions.DependencyInjection; |
||||
|
using Volo.Abp.Modularity; |
||||
|
using WorkflowCore.Interface; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
[DependsOn(typeof(AbpWorkflowCoreModule))] |
||||
|
public class AbpWorkflowCorePersistenceModule : AbpModule |
||||
|
{ |
||||
|
public override void PreConfigureServices(ServiceConfigurationContext context) |
||||
|
{ |
||||
|
context.Services.AddSingleton<IPersistenceProvider, AbpWorkflowPersistenceProvider>(); |
||||
|
context.Services.AddSingleton<AbpWorkflowPersistenceProvider>(); |
||||
|
|
||||
|
PreConfigure<WorkflowOptions>(options => |
||||
|
{ |
||||
|
options.UsePersistence(provider => provider.GetRequiredService<AbpWorkflowPersistenceProvider>()); |
||||
|
}); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,354 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
using Volo.Abp.DependencyInjection; |
||||
|
using Volo.Abp.Guids; |
||||
|
using Volo.Abp.Linq; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
using Volo.Abp.Uow; |
||||
|
using WorkflowCore.Interface; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class AbpWorkflowPersistenceProvider : IPersistenceProvider, IUnitOfWorkEnabled, ITransientDependency |
||||
|
{ |
||||
|
private readonly ICurrentTenant _currentTenant; |
||||
|
private readonly IGuidGenerator _guidGenerator; |
||||
|
private readonly IWorkflowRepository _workflowRepository; |
||||
|
private readonly IWorkflowEventRepository _workflowEventRepository; |
||||
|
private readonly IWorkflowExecutionErrorRepository _executionErrorRepository; |
||||
|
private readonly IWorkflowEventSubscriptionRepository _subscriptionRepository; |
||||
|
private readonly IWorkflowScheduledCommandRepository _scheduledCommandRepository; |
||||
|
|
||||
|
private readonly IAsyncQueryableExecuter _asyncQueryableExecuter; |
||||
|
|
||||
|
public bool SupportsScheduledCommands => true; |
||||
|
|
||||
|
public AbpWorkflowPersistenceProvider( |
||||
|
ICurrentTenant currentTenant, |
||||
|
IGuidGenerator guidGenerator, |
||||
|
IAsyncQueryableExecuter asyncQueryableExecuter, |
||||
|
IWorkflowRepository workflowRepository, |
||||
|
IWorkflowEventRepository workflowEventRepository, |
||||
|
IWorkflowExecutionErrorRepository executionErrorRepository, |
||||
|
IWorkflowEventSubscriptionRepository subscriptionRepository, |
||||
|
IWorkflowScheduledCommandRepository scheduledCommandRepository) |
||||
|
{ |
||||
|
_currentTenant = currentTenant; |
||||
|
_guidGenerator = guidGenerator; |
||||
|
_asyncQueryableExecuter = asyncQueryableExecuter; |
||||
|
|
||||
|
_workflowRepository = workflowRepository; |
||||
|
_workflowEventRepository = workflowEventRepository; |
||||
|
_executionErrorRepository = executionErrorRepository; |
||||
|
_subscriptionRepository = subscriptionRepository; |
||||
|
_scheduledCommandRepository = scheduledCommandRepository; |
||||
|
} |
||||
|
|
||||
|
public virtual async Task ClearSubscriptionToken( |
||||
|
string eventSubscriptionId, |
||||
|
string token, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var uid = Guid.Parse(eventSubscriptionId); |
||||
|
var existingEntity = await _subscriptionRepository.GetAsync(uid, cancellationToken: cancellationToken); |
||||
|
|
||||
|
if (existingEntity.ExternalToken != token) |
||||
|
throw new InvalidOperationException(); |
||||
|
|
||||
|
existingEntity.SetSubscriptionToken(null, null, null); |
||||
|
|
||||
|
await _subscriptionRepository.UpdateAsync(existingEntity, cancellationToken: cancellationToken); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<string> CreateEvent( |
||||
|
Event newEvent, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var we = newEvent.ToWorkflowEvent(_guidGenerator, _currentTenant); |
||||
|
|
||||
|
await _workflowEventRepository.InsertAsync(we, cancellationToken: cancellationToken); |
||||
|
|
||||
|
newEvent.Id = we.Id.ToString(); |
||||
|
|
||||
|
return newEvent.Id; |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<string> CreateEventSubscription( |
||||
|
EventSubscription subscription, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var wes = subscription.ToWorkflowEventSubscription(_guidGenerator, _currentTenant); |
||||
|
|
||||
|
await _subscriptionRepository.InsertAsync(wes, cancellationToken: cancellationToken); |
||||
|
|
||||
|
subscription.Id = wes.Id.ToString(); |
||||
|
|
||||
|
return subscription.Id; |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<string> CreateNewWorkflow( |
||||
|
WorkflowInstance workflow, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var wf = workflow.ToWorkflow(_guidGenerator, _currentTenant); |
||||
|
|
||||
|
await _workflowRepository.InsertAsync(wf, cancellationToken: cancellationToken); |
||||
|
|
||||
|
workflow.Id = wf.Id.ToString(); |
||||
|
|
||||
|
return workflow.Id; |
||||
|
} |
||||
|
|
||||
|
[UnitOfWork(IsDisabled = true)] |
||||
|
public void EnsureStoreExists() |
||||
|
{ |
||||
|
// TODO:
|
||||
|
} |
||||
|
|
||||
|
public virtual async Task<Event> GetEvent(string id, CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var eventId = Guid.Parse(id); |
||||
|
|
||||
|
var workflowEvent = await _workflowEventRepository.GetAsync(eventId, cancellationToken: cancellationToken); |
||||
|
|
||||
|
return workflowEvent.ToEvent(); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<IEnumerable<string>> GetEvents( |
||||
|
string eventName, |
||||
|
string eventKey, |
||||
|
DateTime asOf, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var queryable = await _workflowEventRepository.GetQueryableAsync(); |
||||
|
var workflowEventIds = await _asyncQueryableExecuter.ToListAsync( |
||||
|
queryable.Where(x => x.EventName == eventName && x.EventKey == eventKey) |
||||
|
.Where(x => x.CreationTime >= asOf) |
||||
|
.Select(x => x.Id), |
||||
|
cancellationToken); |
||||
|
|
||||
|
return workflowEventIds.Select(e => e.ToString()); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<EventSubscription> GetFirstOpenSubscription( |
||||
|
string eventName, |
||||
|
string eventKey, |
||||
|
DateTime asOf, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var queryable = await _subscriptionRepository.GetQueryableAsync(); |
||||
|
var workflowEventSubscription = await _asyncQueryableExecuter.FirstOrDefaultAsync( |
||||
|
queryable.Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf && x.ExternalToken == null), |
||||
|
cancellationToken); |
||||
|
|
||||
|
return workflowEventSubscription?.ToEventSubscription(); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<IEnumerable<string>> GetRunnableEvents( |
||||
|
DateTime asAt, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var workflowEvents = await _workflowEventRepository.GetQueryableAsync(); |
||||
|
|
||||
|
var now = asAt.ToUniversalTime(); |
||||
|
|
||||
|
var workflowEventIdList = await _asyncQueryableExecuter.ToListAsync( |
||||
|
workflowEvents.Where(x => !x.IsProcessed) |
||||
|
.Where(x => x.CreationTime <= now) |
||||
|
.Select(x => x.Id), |
||||
|
cancellationToken); |
||||
|
|
||||
|
return workflowEventIdList.Select(e => e.ToString()); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<IEnumerable<string>> GetRunnableInstances( |
||||
|
DateTime asAt, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var now = asAt.ToUniversalTime().Ticks; |
||||
|
|
||||
|
var workflows = await _workflowRepository.GetQueryableAsync(); |
||||
|
var workflowIdList = await _asyncQueryableExecuter.ToListAsync( |
||||
|
workflows.Where(x => x.NextExecution.HasValue && (x.NextExecution <= now) && (x.Status == WorkflowStatus.Runnable)) |
||||
|
.Select(x => x.Id), |
||||
|
cancellationToken); |
||||
|
|
||||
|
return workflowIdList.Select(e => e.ToString()); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<EventSubscription> GetSubscription( |
||||
|
string eventSubscriptionId, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var subscriptionId = Guid.Parse(eventSubscriptionId); |
||||
|
var subscription = await _subscriptionRepository.FindAsync(subscriptionId, cancellationToken: cancellationToken); |
||||
|
|
||||
|
return subscription?.ToEventSubscription(); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<IEnumerable<EventSubscription>> GetSubscriptions( |
||||
|
string eventName, |
||||
|
string eventKey, |
||||
|
DateTime asOf, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var now = asOf.ToUniversalTime(); |
||||
|
var subscriptions = await _subscriptionRepository.GetQueryableAsync(); |
||||
|
var eventSubscriptions = await _asyncQueryableExecuter.ToListAsync( |
||||
|
subscriptions.Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= now), |
||||
|
cancellationToken); |
||||
|
|
||||
|
return eventSubscriptions.Select(x => x.ToEventSubscription()); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<WorkflowInstance> GetWorkflowInstance( |
||||
|
string Id, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var workflowId = Guid.Parse(Id); |
||||
|
var workflow = await _workflowRepository.FindAsync( |
||||
|
workflowId, |
||||
|
includeDetails: true, |
||||
|
cancellationToken: cancellationToken); |
||||
|
|
||||
|
return workflow?.ToWorkflowInstance(); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances( |
||||
|
WorkflowStatus? status, |
||||
|
string type, |
||||
|
DateTime? createdFrom, |
||||
|
DateTime? createdTo, |
||||
|
int skip, |
||||
|
int take) |
||||
|
{ |
||||
|
var workflows = await _workflowRepository.GetListAsync(status, type, createdFrom, createdTo, skip, take); |
||||
|
|
||||
|
return workflows.Select(x => x.ToWorkflowInstance()); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances( |
||||
|
IEnumerable<string> ids, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var workflowIds = ids.Select(id => Guid.Parse(id)); |
||||
|
|
||||
|
var queryable = await _workflowRepository.GetQueryableAsync(); |
||||
|
var workflows = await _asyncQueryableExecuter.ToListAsync( |
||||
|
queryable.Where(x => workflowIds.Contains(x.Id)), |
||||
|
cancellationToken); |
||||
|
|
||||
|
return workflows.Select(x => x.ToWorkflowInstance()); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var eventId = Guid.Parse(id); |
||||
|
var workflowEvent = await _workflowEventRepository.GetAsync(eventId, cancellationToken: cancellationToken); |
||||
|
|
||||
|
workflowEvent.IsProcessed = true; |
||||
|
|
||||
|
await _workflowEventRepository.UpdateAsync(workflowEvent, cancellationToken: cancellationToken); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var eventId = Guid.Parse(id); |
||||
|
var workflowEvent = await _workflowEventRepository.GetAsync(eventId, cancellationToken: cancellationToken); |
||||
|
|
||||
|
workflowEvent.IsProcessed = false; |
||||
|
|
||||
|
await _workflowEventRepository.UpdateAsync(workflowEvent, cancellationToken: cancellationToken); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
if (errors.Any()) |
||||
|
{ |
||||
|
var workflowExecutionErrors = errors.Select(x => x.ToWorkflowExecutionError(_currentTenant)); |
||||
|
|
||||
|
await _executionErrorRepository.InsertManyAsync(workflowExecutionErrors, cancellationToken: cancellationToken); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public virtual async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
if (!Guid.TryParse(workflow.Id, out Guid workflowId)) |
||||
|
{ |
||||
|
workflowId = _guidGenerator.Create(); |
||||
|
} |
||||
|
|
||||
|
var wf = await _workflowRepository.FindAsync(workflowId, includeDetails: true, cancellationToken: cancellationToken); |
||||
|
if (wf == null) |
||||
|
{ |
||||
|
wf = workflow.ToWorkflow(_guidGenerator, _currentTenant); |
||||
|
|
||||
|
await _workflowRepository.InsertAsync(wf, cancellationToken: cancellationToken); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
wf.Update(workflow, _guidGenerator, _currentTenant); |
||||
|
|
||||
|
await _workflowRepository.UpdateAsync(wf, cancellationToken: cancellationToken); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public virtual async Task ProcessCommands( |
||||
|
DateTimeOffset asOf, |
||||
|
Func<ScheduledCommand, Task> action, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var quertable = await _scheduledCommandRepository.GetQueryableAsync(); |
||||
|
var commands = await _asyncQueryableExecuter.ToListAsync( |
||||
|
quertable.Where(x => x.ExecuteTime < asOf.UtcDateTime.Ticks), |
||||
|
cancellationToken); |
||||
|
|
||||
|
foreach (var command in commands) |
||||
|
{ |
||||
|
await action(command.ToScheduledCommand()); |
||||
|
} |
||||
|
|
||||
|
await _scheduledCommandRepository.DeleteManyAsync(commands, cancellationToken: cancellationToken); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task ScheduleCommand(ScheduledCommand command) |
||||
|
{ |
||||
|
var workflowCommand = command.ToWorkflowScheduledCommand(_currentTenant); |
||||
|
|
||||
|
await _scheduledCommandRepository.InsertAsync(workflowCommand); |
||||
|
} |
||||
|
|
||||
|
public virtual async Task<bool> SetSubscriptionToken( |
||||
|
string eventSubscriptionId, |
||||
|
string token, |
||||
|
string workerId, |
||||
|
DateTime expiry, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var uid = Guid.Parse(eventSubscriptionId); |
||||
|
|
||||
|
var existingEntity = await _subscriptionRepository.GetAsync(uid, cancellationToken: cancellationToken); |
||||
|
|
||||
|
existingEntity.SetSubscriptionToken(token, workerId, expiry); |
||||
|
|
||||
|
await _subscriptionRepository.UpdateAsync(existingEntity, cancellationToken: cancellationToken); |
||||
|
|
||||
|
return true; |
||||
|
} |
||||
|
|
||||
|
public virtual async Task TerminateSubscription( |
||||
|
string eventSubscriptionId, |
||||
|
CancellationToken cancellationToken = default) |
||||
|
{ |
||||
|
var uid = Guid.Parse(eventSubscriptionId); |
||||
|
|
||||
|
var existingEntity = await _subscriptionRepository.GetAsync(uid, cancellationToken: cancellationToken); |
||||
|
|
||||
|
await _subscriptionRepository.DeleteAsync(existingEntity, cancellationToken: cancellationToken); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,9 @@ |
|||||
|
using System; |
||||
|
using Volo.Abp.Domain.Repositories; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public interface IWorkflowEventRepository : IRepository<WorkflowEvent, Guid> |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,9 @@ |
|||||
|
using System; |
||||
|
using Volo.Abp.Domain.Repositories; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public interface IWorkflowEventSubscriptionRepository : IRepository<WorkflowEventSubscription, Guid> |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,8 @@ |
|||||
|
using Volo.Abp.Domain.Repositories; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public interface IWorkflowExecutionErrorRepository : IRepository<WorkflowExecutionError, int> |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,21 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
using Volo.Abp.Domain.Repositories; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public interface IWorkflowRepository : IRepository<Workflow, Guid> |
||||
|
{ |
||||
|
Task<List<Workflow>> GetListAsync( |
||||
|
WorkflowStatus? status, |
||||
|
string type, |
||||
|
DateTime? createdFrom, |
||||
|
DateTime? createdTo, |
||||
|
int skip, |
||||
|
int take, |
||||
|
CancellationToken cancellationToken = default); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,8 @@ |
|||||
|
using Volo.Abp.Domain.Repositories; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public interface IWorkflowScheduledCommandRepository : IRepository<WorkflowScheduledCommand, long> |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,101 @@ |
|||||
|
using Newtonsoft.Json; |
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Collections.ObjectModel; |
||||
|
using System.Linq; |
||||
|
using Volo.Abp.Domain.Entities.Auditing; |
||||
|
using Volo.Abp.Guids; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class Workflow : AuditedAggregateRoot<Guid>, IMultiTenant |
||||
|
{ |
||||
|
public virtual Guid? TenantId { get; protected set; } |
||||
|
public virtual string WorkflowDefinitionId { get; protected set; } |
||||
|
public virtual int Version { get; protected set; } |
||||
|
public virtual string Description { get; protected set; } |
||||
|
public virtual string Reference { get; protected set; } |
||||
|
public virtual long? NextExecution { get; protected set; } |
||||
|
public virtual WorkflowStatus Status { get; protected set; } |
||||
|
public virtual string Data { get; protected set; } |
||||
|
public virtual DateTime? CompleteTime { get; protected set; } |
||||
|
public virtual ICollection<WorkflowExecutionPointer> ExecutionPointers { get; protected set; } |
||||
|
|
||||
|
protected Workflow() |
||||
|
{ |
||||
|
ExecutionPointers = new Collection<WorkflowExecutionPointer>(); |
||||
|
} |
||||
|
|
||||
|
public Workflow( |
||||
|
Guid id, |
||||
|
DateTime creationTime, |
||||
|
string defintionId, |
||||
|
string data, |
||||
|
int version, |
||||
|
string description, |
||||
|
string reference, |
||||
|
long? nextExecution = null, |
||||
|
WorkflowStatus status = WorkflowStatus.Terminated, |
||||
|
DateTime? completeTime = null, |
||||
|
Guid? tenantId = null) : base(id) |
||||
|
{ |
||||
|
Data = data; |
||||
|
CreationTime = creationTime; |
||||
|
WorkflowDefinitionId = defintionId; |
||||
|
Version = version; |
||||
|
Description = description; |
||||
|
Reference = reference; |
||||
|
NextExecution = nextExecution; |
||||
|
Status = status; |
||||
|
CompleteTime = completeTime; |
||||
|
TenantId = tenantId; |
||||
|
|
||||
|
ExecutionPointers = new Collection<WorkflowExecutionPointer>(); |
||||
|
} |
||||
|
|
||||
|
public void AddPointer(WorkflowExecutionPointer pointer) |
||||
|
{ |
||||
|
ExecutionPointers.Add(pointer); |
||||
|
} |
||||
|
|
||||
|
public WorkflowExecutionPointer FindPointer(Guid id) |
||||
|
{ |
||||
|
return ExecutionPointers.FirstOrDefault(point => point.Id.Equals(id)); |
||||
|
} |
||||
|
|
||||
|
public void Update( |
||||
|
WorkflowInstance instance, |
||||
|
IGuidGenerator guidGenerator, |
||||
|
ICurrentTenant currentTenant) |
||||
|
{ |
||||
|
Data = JsonConvert.SerializeObject(instance.Data); |
||||
|
CreationTime = instance.CreateTime; |
||||
|
WorkflowDefinitionId = instance.WorkflowDefinitionId; |
||||
|
Version = instance.Version; |
||||
|
Description = instance.Description; |
||||
|
Reference = instance.Reference; |
||||
|
NextExecution = instance.NextExecution; |
||||
|
Status = instance.Status; |
||||
|
CompleteTime = instance.CompleteTime; |
||||
|
|
||||
|
foreach (var pointer in instance.ExecutionPointers) |
||||
|
{ |
||||
|
if (!Guid.TryParse(pointer.Id, out Guid pointerId)) |
||||
|
{ |
||||
|
pointerId = guidGenerator.Create(); |
||||
|
} |
||||
|
|
||||
|
var currentPointer = FindPointer(pointerId); |
||||
|
if (currentPointer != null) |
||||
|
{ |
||||
|
currentPointer.Update(pointer); |
||||
|
continue; |
||||
|
} |
||||
|
|
||||
|
AddPointer(pointer.ToWorkflowExecutionPointer(this, guidGenerator, currentTenant)); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,9 @@ |
|||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public static class WorkflowDbProperties |
||||
|
{ |
||||
|
public const string ConnectionStringName = "AbpWorkflowCore"; |
||||
|
|
||||
|
public static string TablePrefix = "WF_"; |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,47 @@ |
|||||
|
using System; |
||||
|
using Volo.Abp.Auditing; |
||||
|
using Volo.Abp.Domain.Entities; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class WorkflowEvent : Entity<Guid>, IMultiTenant, IHasCreationTime |
||||
|
{ |
||||
|
public virtual Guid? TenantId { get; protected set; } |
||||
|
/// <summary>
|
||||
|
/// 名称
|
||||
|
/// </summary>
|
||||
|
public virtual string EventName { get; protected set; } |
||||
|
/// <summary>
|
||||
|
/// Key
|
||||
|
/// </summary>
|
||||
|
public virtual string EventKey { get; protected set; } |
||||
|
/// <summary>
|
||||
|
/// 数据
|
||||
|
/// </summary>
|
||||
|
public virtual string EventData { get; protected set; } |
||||
|
/// <summary>
|
||||
|
/// 是否已处理
|
||||
|
/// </summary>
|
||||
|
public virtual bool IsProcessed { get; set; } |
||||
|
/// <summary>
|
||||
|
/// 建立时间
|
||||
|
/// </summary>
|
||||
|
public virtual DateTime CreationTime { get; protected set; } |
||||
|
protected WorkflowEvent() { } |
||||
|
public WorkflowEvent( |
||||
|
Guid id, |
||||
|
string name, |
||||
|
string key, |
||||
|
string data, |
||||
|
DateTime creationTime, |
||||
|
Guid? tenantId = null) : base(id) |
||||
|
{ |
||||
|
EventName = name; |
||||
|
EventKey = key; |
||||
|
EventData = data; |
||||
|
CreationTime = creationTime; |
||||
|
TenantId = tenantId; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,73 @@ |
|||||
|
using System; |
||||
|
using Volo.Abp.Domain.Entities; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class WorkflowEventSubscription : Entity<Guid>, IMultiTenant |
||||
|
{ |
||||
|
public virtual Guid? TenantId { get; protected set; } |
||||
|
|
||||
|
public virtual Guid WorkflowId { get; protected set; } |
||||
|
|
||||
|
public virtual int StepId { get; protected set; } |
||||
|
|
||||
|
public virtual Guid ExecutionPointerId { get; protected set; } |
||||
|
|
||||
|
public virtual string EventName { get; protected set; } |
||||
|
|
||||
|
public virtual string EventKey { get; protected set; } |
||||
|
|
||||
|
public virtual DateTime SubscribeAsOf { get; protected set; } |
||||
|
|
||||
|
public virtual string SubscriptionData { get; protected set; } |
||||
|
|
||||
|
public virtual string ExternalToken { get; protected set; } |
||||
|
|
||||
|
public virtual string ExternalWorkerId { get; protected set; } |
||||
|
|
||||
|
public virtual DateTime? ExternalTokenExpiry { get; protected set; } |
||||
|
|
||||
|
protected WorkflowEventSubscription() |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
public WorkflowEventSubscription( |
||||
|
Guid id, |
||||
|
Guid workflowId, |
||||
|
int stepId, |
||||
|
Guid pointerId, |
||||
|
string eventName, |
||||
|
string eventKey, |
||||
|
DateTime subscribeAsOf, |
||||
|
string subscriptionData, |
||||
|
string externalToken, |
||||
|
string externalWorkerId, |
||||
|
DateTime? externalTokenExpiry = null, |
||||
|
Guid? tenantId = null) : base(id) |
||||
|
{ |
||||
|
WorkflowId = workflowId; |
||||
|
StepId = stepId; |
||||
|
ExecutionPointerId = pointerId; |
||||
|
EventName = eventName; |
||||
|
EventKey = eventKey; |
||||
|
SubscribeAsOf = subscribeAsOf; |
||||
|
SubscriptionData = subscriptionData; |
||||
|
ExternalToken = externalToken; |
||||
|
ExternalWorkerId = externalWorkerId; |
||||
|
ExternalTokenExpiry = externalTokenExpiry; |
||||
|
|
||||
|
TenantId = tenantId; |
||||
|
} |
||||
|
|
||||
|
public void SetSubscriptionToken( |
||||
|
string token, |
||||
|
string workerId, |
||||
|
DateTime? expiry) |
||||
|
{ |
||||
|
ExternalToken = token; |
||||
|
ExternalWorkerId = workerId; |
||||
|
ExternalTokenExpiry = expiry; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,34 @@ |
|||||
|
using System; |
||||
|
using Volo.Abp.Domain.Entities; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class WorkflowExecutionError : Entity<int>, IMultiTenant |
||||
|
{ |
||||
|
public virtual Guid? TenantId { get; protected set; } |
||||
|
|
||||
|
public virtual Guid WorkflowId { get; protected set; } |
||||
|
|
||||
|
public virtual Guid ExecutionPointerId { get; set; } |
||||
|
|
||||
|
public virtual DateTime ErrorTime { get; set; } |
||||
|
|
||||
|
public virtual string Message { get; set; } |
||||
|
|
||||
|
protected WorkflowExecutionError() { } |
||||
|
public WorkflowExecutionError( |
||||
|
Guid workflowId, |
||||
|
Guid executionPointerId, |
||||
|
DateTime errorTime, |
||||
|
string message, |
||||
|
Guid? tenantId = null) |
||||
|
{ |
||||
|
WorkflowId = workflowId; |
||||
|
ExecutionPointerId = executionPointerId; |
||||
|
ErrorTime = errorTime; |
||||
|
Message = message; |
||||
|
TenantId = tenantId; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,162 @@ |
|||||
|
using Newtonsoft.Json; |
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Collections.ObjectModel; |
||||
|
using System.Linq; |
||||
|
using Volo.Abp.Data; |
||||
|
using Volo.Abp.Domain.Entities; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class WorkflowExecutionPointer : Entity<Guid>, IMultiTenant |
||||
|
{ |
||||
|
public virtual Guid? TenantId { get; protected set; } |
||||
|
|
||||
|
public virtual Guid WorkflowId { get; protected set; } |
||||
|
|
||||
|
public virtual Workflow Workflow { get; protected set; } |
||||
|
|
||||
|
public virtual int StepId { get; protected set; } |
||||
|
|
||||
|
public virtual bool Active { get; protected set; } |
||||
|
|
||||
|
public virtual DateTime? SleepUntil { get; protected set; } |
||||
|
|
||||
|
public virtual string PersistenceData { get; protected set; } |
||||
|
|
||||
|
public virtual DateTime? StartTime { get; protected set; } |
||||
|
|
||||
|
public virtual DateTime? EndTime { get; protected set; } |
||||
|
|
||||
|
public virtual string EventName { get; protected set; } |
||||
|
|
||||
|
public virtual string EventKey { get; protected set; } |
||||
|
|
||||
|
public virtual bool EventPublished { get; protected set; } |
||||
|
|
||||
|
public virtual string EventData { get; protected set; } |
||||
|
|
||||
|
public virtual string StepName { get; protected set; } |
||||
|
|
||||
|
public virtual int RetryCount { get; protected set; } |
||||
|
|
||||
|
public virtual string Children { get; protected set; } |
||||
|
|
||||
|
public virtual string ContextItem { get; protected set; } |
||||
|
|
||||
|
public virtual string PredecessorId { get; protected set; } |
||||
|
|
||||
|
public virtual string Outcome { get; protected set; } |
||||
|
|
||||
|
public virtual PointerStatus Status { get; protected set; } |
||||
|
|
||||
|
public virtual string Scope { get; protected set; } |
||||
|
|
||||
|
public virtual ICollection<WorkflowExtensionAttribute> ExtensionAttributes { get; protected set; } |
||||
|
|
||||
|
protected WorkflowExecutionPointer() |
||||
|
{ |
||||
|
ExtensionAttributes = new Collection<WorkflowExtensionAttribute>(); |
||||
|
} |
||||
|
|
||||
|
public WorkflowExecutionPointer( |
||||
|
Guid id, |
||||
|
Guid workflowId, |
||||
|
int stepId, |
||||
|
string stepName, |
||||
|
bool active, |
||||
|
string persistenceData, |
||||
|
string eventName, |
||||
|
string eventKey, |
||||
|
bool eventPublished, |
||||
|
string eventData, |
||||
|
int retryCount, |
||||
|
string children, |
||||
|
string contextItem, |
||||
|
string predecessorId, |
||||
|
string outcome, |
||||
|
string scope, |
||||
|
PointerStatus status = PointerStatus.Legacy, |
||||
|
DateTime? sleepUntil = null, |
||||
|
DateTime? startTime = null, |
||||
|
DateTime? endTime = null, |
||||
|
Guid? tenantId = null) : base(id) |
||||
|
{ |
||||
|
WorkflowId = workflowId; |
||||
|
StepId = stepId; |
||||
|
StepName = stepName; |
||||
|
Active = active; |
||||
|
PersistenceData = persistenceData; |
||||
|
EventName = eventName; |
||||
|
EventKey = eventKey; |
||||
|
EventPublished = eventPublished; |
||||
|
EventData = eventData; |
||||
|
RetryCount = retryCount; |
||||
|
Children = children; |
||||
|
ContextItem = contextItem; |
||||
|
PredecessorId = predecessorId; |
||||
|
Outcome = outcome; |
||||
|
Scope = scope; |
||||
|
Status = status; |
||||
|
SleepUntil = sleepUntil; |
||||
|
StartTime = startTime; |
||||
|
EndTime = endTime; |
||||
|
|
||||
|
TenantId = tenantId; |
||||
|
|
||||
|
ExtensionAttributes = new Collection<WorkflowExtensionAttribute>(); |
||||
|
} |
||||
|
|
||||
|
public void Update(ExecutionPointer pointer) |
||||
|
{ |
||||
|
StepId = pointer.StepId; |
||||
|
StepName = pointer.StepName; |
||||
|
Active = pointer.Active; |
||||
|
PersistenceData = JsonConvert.SerializeObject(pointer.PersistenceData); |
||||
|
EventName = pointer.EventName; |
||||
|
EventKey = pointer.EventKey; |
||||
|
EventPublished = pointer.EventPublished; |
||||
|
EventData = JsonConvert.SerializeObject(pointer.EventData); |
||||
|
RetryCount = pointer.RetryCount; |
||||
|
Children = pointer.Children.JoinAsString(";"); |
||||
|
ContextItem = JsonConvert.SerializeObject(pointer.ContextItem); |
||||
|
PredecessorId = pointer.PredecessorId; |
||||
|
Outcome = JsonConvert.SerializeObject(pointer.Outcome); |
||||
|
Scope = pointer.Scope.JoinAsString(";"); |
||||
|
Status = pointer.Status; |
||||
|
SleepUntil = pointer.SleepUntil; |
||||
|
StartTime = pointer.StartTime; |
||||
|
EndTime = pointer.EndTime; |
||||
|
|
||||
|
foreach (var attribute in pointer.ExtensionAttributes) |
||||
|
{ |
||||
|
var findAttr = FindAttribute(attribute.Key); |
||||
|
if (findAttr == null) |
||||
|
{ |
||||
|
findAttr = new WorkflowExtensionAttribute(Id, attribute.Key, attribute.Value.SerializeObject()); |
||||
|
|
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
findAttr.Key = attribute.Key; |
||||
|
findAttr.Value = attribute.Value.SerializeObject(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public WorkflowExtensionAttribute AddAttribute(string key, string value) |
||||
|
{ |
||||
|
var attr = new WorkflowExtensionAttribute(Id, key, value); |
||||
|
ExtensionAttributes.Add(attr); |
||||
|
|
||||
|
return attr; |
||||
|
} |
||||
|
|
||||
|
public WorkflowExtensionAttribute FindAttribute(string key) |
||||
|
{ |
||||
|
return ExtensionAttributes.FirstOrDefault(x => x.Key.Equals(key)); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,31 @@ |
|||||
|
using System; |
||||
|
using Volo.Abp.Domain.Entities; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class WorkflowExtensionAttribute : Entity<long>, IMultiTenant |
||||
|
{ |
||||
|
public virtual Guid? TenantId { get; protected set; } |
||||
|
|
||||
|
public virtual Guid ExecutionPointerId { get; set; } |
||||
|
|
||||
|
public virtual WorkflowExecutionPointer ExecutionPointer { get; set; } |
||||
|
|
||||
|
public virtual string Key { get; set; } |
||||
|
|
||||
|
public virtual string Value { get; set; } |
||||
|
|
||||
|
protected WorkflowExtensionAttribute() { } |
||||
|
|
||||
|
public WorkflowExtensionAttribute( |
||||
|
Guid pointerId, |
||||
|
string key, |
||||
|
string value) |
||||
|
{ |
||||
|
ExecutionPointerId = pointerId; |
||||
|
Key = key; |
||||
|
Value = value; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,113 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public static class WorkflowExtensions |
||||
|
{ |
||||
|
public static Event ToEvent(this WorkflowEvent workflowEvent) |
||||
|
{ |
||||
|
return new Event |
||||
|
{ |
||||
|
Id = workflowEvent.Id.ToString(), |
||||
|
EventName = workflowEvent.EventName, |
||||
|
EventKey = workflowEvent.EventKey, |
||||
|
EventTime = workflowEvent.CreationTime.ToUtcDateTime(), |
||||
|
IsProcessed = workflowEvent.IsProcessed, |
||||
|
EventData = workflowEvent.EventData.DeserializeObject() |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
public static EventSubscription ToEventSubscription(this WorkflowEventSubscription workflowEventSubscription) |
||||
|
{ |
||||
|
return new EventSubscription |
||||
|
{ |
||||
|
Id = workflowEventSubscription.Id.ToString(), |
||||
|
StepId = workflowEventSubscription.StepId, |
||||
|
SubscribeAsOf = workflowEventSubscription.SubscribeAsOf.ToUtcDateTime(), |
||||
|
SubscriptionData = workflowEventSubscription.SubscriptionData.DeserializeObject(), |
||||
|
EventKey = workflowEventSubscription.EventKey, |
||||
|
EventName = workflowEventSubscription.EventName, |
||||
|
ExecutionPointerId = workflowEventSubscription.ExecutionPointerId.ToString(), |
||||
|
ExternalWorkerId = workflowEventSubscription.ExternalWorkerId, |
||||
|
ExternalToken = workflowEventSubscription.ExternalToken, |
||||
|
ExternalTokenExpiry = workflowEventSubscription.ExternalTokenExpiry.ToNullableUtcDateTime(), |
||||
|
WorkflowId = workflowEventSubscription.WorkflowId.ToString() |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
public static WorkflowInstance ToWorkflowInstance(this Workflow workflow) |
||||
|
{ |
||||
|
return new WorkflowInstance |
||||
|
{ |
||||
|
Id = workflow.Id.ToString(), |
||||
|
WorkflowDefinitionId = workflow.WorkflowDefinitionId, |
||||
|
CompleteTime = workflow.CompleteTime.ToNullableUtcDateTime(), |
||||
|
CreateTime = workflow.CreationTime.ToUtcDateTime(), |
||||
|
Data = workflow.Data.SerializeObject(), |
||||
|
Status = workflow.Status, |
||||
|
Description = workflow.Description, |
||||
|
NextExecution = workflow.NextExecution, |
||||
|
Reference = workflow.Reference, |
||||
|
Version = workflow.Version, |
||||
|
ExecutionPointers = new ExecutionPointerCollection( |
||||
|
workflow.ExecutionPointers |
||||
|
.Select(pointer => pointer.ToExecutionPointer()) |
||||
|
.ToList()) |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
public static ExecutionPointer ToExecutionPointer(this WorkflowExecutionPointer pointer) |
||||
|
{ |
||||
|
return new ExecutionPointer |
||||
|
{ |
||||
|
Id = pointer.Id.ToString(), |
||||
|
EventData = pointer.EventData.DeserializeObject(), |
||||
|
EventKey = pointer.StepName, |
||||
|
EventName = pointer.EventName, |
||||
|
EventPublished = pointer.EventPublished, |
||||
|
ExtensionAttributes = pointer.ExtensionAttributes.ToExtensionAttributes(), |
||||
|
Active = pointer.Active, |
||||
|
Children = pointer.Children.Split(';').ToList(), |
||||
|
ContextItem = pointer.ContextItem.DeserializeObject(), |
||||
|
Scope = pointer.Scope.Split(';').ToList(), |
||||
|
Outcome = pointer.Outcome.DeserializeObject(), |
||||
|
PersistenceData = pointer.PersistenceData.DeserializeObject(), |
||||
|
PredecessorId = pointer.PredecessorId, |
||||
|
RetryCount = pointer.RetryCount, |
||||
|
Status = pointer.Status, |
||||
|
StepId = pointer.StepId, |
||||
|
StepName = pointer.StepName, |
||||
|
EndTime = pointer.EndTime.ToNullableUtcDateTime(), |
||||
|
StartTime = pointer.StartTime.ToNullableUtcDateTime(), |
||||
|
SleepUntil = pointer.SleepUntil.ToNullableUtcDateTime(), |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
public static ScheduledCommand ToScheduledCommand( |
||||
|
this WorkflowScheduledCommand command) |
||||
|
{ |
||||
|
return new ScheduledCommand |
||||
|
{ |
||||
|
CommandName = command.CommandName, |
||||
|
Data = command.Data, |
||||
|
ExecuteTime = command.ExecuteTime |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
public static Dictionary<string, object> ToExtensionAttributes( |
||||
|
this ICollection<WorkflowExtensionAttribute> attributes) |
||||
|
{ |
||||
|
var attrDic = new Dictionary<string, object>(); |
||||
|
|
||||
|
foreach (var attr in attributes) |
||||
|
{ |
||||
|
attrDic.Add(attr.Key, attr.Value.DeserializeObject()); |
||||
|
} |
||||
|
|
||||
|
return attrDic; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,29 @@ |
|||||
|
using System; |
||||
|
using Volo.Abp.Domain.Entities; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.Persistence |
||||
|
{ |
||||
|
public class WorkflowScheduledCommand : Entity<long>, IMultiTenant |
||||
|
{ |
||||
|
public virtual Guid? TenantId { get; protected set; } |
||||
|
|
||||
|
public virtual string CommandName { get; set; } |
||||
|
|
||||
|
public virtual string Data { get; set; } |
||||
|
|
||||
|
public virtual long ExecuteTime { get; set; } |
||||
|
protected WorkflowScheduledCommand() { } |
||||
|
public WorkflowScheduledCommand( |
||||
|
string commandName, |
||||
|
string data, |
||||
|
long executeTime, |
||||
|
Guid? tenantId = null) |
||||
|
{ |
||||
|
CommandName = commandName; |
||||
|
Data = data; |
||||
|
ExecuteTime = executeTime; |
||||
|
TenantId = tenantId; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,138 @@ |
|||||
|
using LINGYUN.Abp.WorkflowCore.Persistence; |
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using Volo.Abp.Guids; |
||||
|
using Volo.Abp.MultiTenancy; |
||||
|
|
||||
|
namespace WorkflowCore.Models |
||||
|
{ |
||||
|
public static class WorkflowExtensions |
||||
|
{ |
||||
|
public static Workflow ToWorkflow( |
||||
|
this WorkflowInstance instance, |
||||
|
IGuidGenerator generator, |
||||
|
ICurrentTenant currentTenant) |
||||
|
{ |
||||
|
var workflow = new Workflow( |
||||
|
generator.Create(), |
||||
|
instance.CreateTime, |
||||
|
instance.WorkflowDefinitionId, |
||||
|
instance.Data.SerializeObject(), |
||||
|
instance.Version, |
||||
|
instance.Description, |
||||
|
instance.Reference, |
||||
|
instance.NextExecution, |
||||
|
instance.Status, |
||||
|
instance.CompleteTime, |
||||
|
currentTenant.Id); |
||||
|
|
||||
|
foreach (var pointer in instance.ExecutionPointers) |
||||
|
{ |
||||
|
pointer.ToWorkflowExecutionPointer(workflow, generator, currentTenant); |
||||
|
} |
||||
|
|
||||
|
return workflow; |
||||
|
} |
||||
|
|
||||
|
public static WorkflowExecutionPointer ToWorkflowExecutionPointer( |
||||
|
this ExecutionPointer executionPointer, |
||||
|
Workflow workflow, |
||||
|
IGuidGenerator generator, |
||||
|
ICurrentTenant currentTenant) |
||||
|
{ |
||||
|
var pointer = new WorkflowExecutionPointer( |
||||
|
generator.Create(), |
||||
|
workflow.Id, |
||||
|
executionPointer.StepId, |
||||
|
executionPointer.StepName, |
||||
|
executionPointer.Active, |
||||
|
executionPointer.PersistenceData.SerializeObject(), |
||||
|
executionPointer.EventName, |
||||
|
executionPointer.EventKey, |
||||
|
executionPointer.EventPublished, |
||||
|
executionPointer.EventData.SerializeObject(), |
||||
|
executionPointer.RetryCount, |
||||
|
executionPointer.Children.JoinAsString(";"), |
||||
|
executionPointer.ContextItem.SerializeObject(), |
||||
|
executionPointer.PredecessorId, |
||||
|
executionPointer.Outcome.SerializeObject(), |
||||
|
executionPointer.Scope.JoinAsString(";"), |
||||
|
executionPointer.Status, |
||||
|
executionPointer.SleepUntil, |
||||
|
executionPointer.StartTime, |
||||
|
executionPointer.EndTime, |
||||
|
currentTenant.Id); |
||||
|
|
||||
|
foreach (var attribute in executionPointer.ExtensionAttributes) |
||||
|
{ |
||||
|
pointer.AddAttribute(attribute.Key, attribute.Value.SerializeObject()); |
||||
|
} |
||||
|
|
||||
|
executionPointer.Id = pointer.Id.ToString(); |
||||
|
|
||||
|
return pointer; |
||||
|
} |
||||
|
|
||||
|
public static WorkflowEvent ToWorkflowEvent( |
||||
|
this Event @event, |
||||
|
IGuidGenerator generator, |
||||
|
ICurrentTenant currentTenant) |
||||
|
{ |
||||
|
var we = new WorkflowEvent( |
||||
|
generator.Create(), |
||||
|
@event.EventName, |
||||
|
@event.EventKey, |
||||
|
@event.EventData.SerializeObject(), |
||||
|
@event.EventTime, |
||||
|
currentTenant.Id) |
||||
|
{ |
||||
|
IsProcessed = @event.IsProcessed |
||||
|
}; |
||||
|
|
||||
|
return we; |
||||
|
} |
||||
|
|
||||
|
public static WorkflowEventSubscription ToWorkflowEventSubscription( |
||||
|
this EventSubscription subscription, |
||||
|
IGuidGenerator generator, |
||||
|
ICurrentTenant currentTenant) |
||||
|
{ |
||||
|
return new WorkflowEventSubscription( |
||||
|
generator.Create(), |
||||
|
Guid.Parse(subscription.WorkflowId), |
||||
|
subscription.StepId, |
||||
|
Guid.Parse(subscription.ExecutionPointerId), |
||||
|
subscription.EventName, |
||||
|
subscription.EventKey, |
||||
|
subscription.SubscribeAsOf, |
||||
|
subscription.SubscriptionData.SerializeObject(), |
||||
|
subscription.ExternalToken, |
||||
|
subscription.ExternalWorkerId, |
||||
|
subscription.ExternalTokenExpiry, |
||||
|
currentTenant.Id); |
||||
|
} |
||||
|
|
||||
|
public static WorkflowExecutionError ToWorkflowExecutionError( |
||||
|
this ExecutionError executionError, |
||||
|
ICurrentTenant currentTenant) |
||||
|
{ |
||||
|
return new WorkflowExecutionError( |
||||
|
Guid.Parse(executionError.WorkflowId), |
||||
|
Guid.Parse(executionError.ExecutionPointerId), |
||||
|
executionError.ErrorTime, |
||||
|
executionError.Message, |
||||
|
currentTenant.Id); |
||||
|
} |
||||
|
|
||||
|
public static WorkflowScheduledCommand ToWorkflowScheduledCommand( |
||||
|
this ScheduledCommand command, |
||||
|
ICurrentTenant currentTenant) |
||||
|
{ |
||||
|
return new WorkflowScheduledCommand( |
||||
|
command.CommandName, |
||||
|
command.Data, |
||||
|
command.ExecuteTime, |
||||
|
currentTenant.Id); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,16 @@ |
|||||
|
<Project Sdk="Microsoft.NET.Sdk"> |
||||
|
|
||||
|
<PropertyGroup> |
||||
|
<TargetFramework>netstandard2.0</TargetFramework> |
||||
|
<RootNamespace /> |
||||
|
</PropertyGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<PackageReference Include="Volo.Abp.RabbitMQ" Version="4.4.0" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<ProjectReference Include="..\LINGYUN.Abp.WorkflowCore\LINGYUN.Abp.WorkflowCore.csproj" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
</Project> |
||||
@ -0,0 +1,26 @@ |
|||||
|
namespace LINGYUN.Abp.WorkflowCore.RabbitMQ |
||||
|
{ |
||||
|
public class AbpRabbitMQWorkflowCoreOptions |
||||
|
{ |
||||
|
/// <summary>
|
||||
|
/// Default value: "AbpWorkflows.".
|
||||
|
/// </summary>
|
||||
|
public string DefaultQueueNamePrefix { get; set; } |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Default value: "AbpWorkflowCore".
|
||||
|
/// </summary>
|
||||
|
public string DefaultConnectionName { get; set; } |
||||
|
/// <summary>
|
||||
|
/// Default valu: "AbpWorkflowCore"
|
||||
|
/// </summary>
|
||||
|
public string DefaultChannelName { get; set; } |
||||
|
|
||||
|
public AbpRabbitMQWorkflowCoreOptions() |
||||
|
{ |
||||
|
DefaultQueueNamePrefix = "AbpWorkflows."; |
||||
|
DefaultConnectionName = "AbpWorkflowCore"; |
||||
|
DefaultChannelName = "AbpWorkflowCore"; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,152 @@ |
|||||
|
using Microsoft.Extensions.Logging; |
||||
|
using Microsoft.Extensions.Logging.Abstractions; |
||||
|
using Microsoft.Extensions.Options; |
||||
|
using RabbitMQ.Client; |
||||
|
using System.Text; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
using Volo.Abp; |
||||
|
using Volo.Abp.RabbitMQ; |
||||
|
using Volo.Abp.Threading; |
||||
|
using WorkflowCore.Interface; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.RabbitMQ |
||||
|
{ |
||||
|
public class AbpRabbitMqQueueProvider : IQueueProvider |
||||
|
{ |
||||
|
protected bool IsDiposed { get; private set; } |
||||
|
protected SemaphoreSlim SyncObj = new SemaphoreSlim(1, 1); |
||||
|
|
||||
|
protected IChannelAccessor ChannelAccessor { get; private set; } |
||||
|
|
||||
|
protected IChannelPool ChannelPool { get; } |
||||
|
protected IQueueNameNormalizer QueueNameNormalizer { get; } |
||||
|
protected AbpRabbitMQWorkflowCoreOptions RabbitMQWorkflowCoreOptions { get; } |
||||
|
|
||||
|
public ILogger<AbpRabbitMqQueueProvider> Logger { get; set; } |
||||
|
|
||||
|
public bool IsDequeueBlocking => false; |
||||
|
|
||||
|
public AbpRabbitMqQueueProvider( |
||||
|
IChannelPool channelPool, |
||||
|
IQueueNameNormalizer queueNameNormalizer, |
||||
|
IOptions<AbpRabbitMQWorkflowCoreOptions> options) |
||||
|
{ |
||||
|
ChannelPool = channelPool; |
||||
|
QueueNameNormalizer = queueNameNormalizer; |
||||
|
RabbitMQWorkflowCoreOptions = options.Value; |
||||
|
|
||||
|
Logger = NullLogger<AbpRabbitMqQueueProvider>.Instance; |
||||
|
} |
||||
|
|
||||
|
public async Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken) |
||||
|
{ |
||||
|
CheckDisposed(); |
||||
|
|
||||
|
using (await SyncObj.LockAsync(cancellationToken)) |
||||
|
{ |
||||
|
await EnsureInitializedAsync(); |
||||
|
|
||||
|
ChannelAccessor.Channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); |
||||
|
|
||||
|
var msg = ChannelAccessor.Channel.BasicGet(QueueNameNormalizer.NormalizeKey(queue), false); |
||||
|
if (msg != null) |
||||
|
{ |
||||
|
var data = Encoding.UTF8.GetString(msg.Body.ToArray()); |
||||
|
ChannelAccessor.Channel.BasicAck(msg.DeliveryTag, false); |
||||
|
return data; |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public async Task QueueWork(string id, QueueType queue) |
||||
|
{ |
||||
|
CheckDisposed(); |
||||
|
|
||||
|
using (await SyncObj.LockAsync()) |
||||
|
{ |
||||
|
await EnsureInitializedAsync(); |
||||
|
|
||||
|
var body = Encoding.UTF8.GetBytes(id); |
||||
|
|
||||
|
ChannelAccessor.Channel.BasicPublish( |
||||
|
exchange: "", |
||||
|
routingKey: QueueNameNormalizer.NormalizeKey(queue), |
||||
|
basicProperties: null, |
||||
|
body: body |
||||
|
); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public async Task Start() |
||||
|
{ |
||||
|
CheckDisposed(); |
||||
|
|
||||
|
using (await SyncObj.LockAsync()) |
||||
|
{ |
||||
|
await EnsureInitializedAsync(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public Task Stop() |
||||
|
{ |
||||
|
Dispose(); |
||||
|
|
||||
|
return Task.CompletedTask; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
if (IsDiposed) |
||||
|
{ |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
IsDiposed = true; |
||||
|
|
||||
|
ChannelAccessor?.Dispose(); |
||||
|
} |
||||
|
|
||||
|
|
||||
|
protected virtual Task EnsureInitializedAsync() |
||||
|
{ |
||||
|
if (ChannelAccessor != null) |
||||
|
{ |
||||
|
return Task.CompletedTask; |
||||
|
} |
||||
|
|
||||
|
ChannelAccessor = ChannelPool.Acquire( |
||||
|
RabbitMQWorkflowCoreOptions.DefaultChannelName, |
||||
|
RabbitMQWorkflowCoreOptions.DefaultConnectionName |
||||
|
); |
||||
|
|
||||
|
CreateDeclareWorkflowQueue(QueueType.Event); |
||||
|
CreateDeclareWorkflowQueue(QueueType.Workflow); |
||||
|
CreateDeclareWorkflowQueue(QueueType.Index); |
||||
|
|
||||
|
return Task.CompletedTask; |
||||
|
} |
||||
|
|
||||
|
protected virtual void CreateDeclareWorkflowQueue(QueueType queue) |
||||
|
{ |
||||
|
var queueName = QueueNameNormalizer.NormalizeKey(queue); |
||||
|
var configuration = new WorkflowQueueConfiguration( |
||||
|
queueName: queueName, |
||||
|
durable: true, |
||||
|
exclusive: false, |
||||
|
autoDelete: false); |
||||
|
|
||||
|
configuration.Declare(ChannelAccessor.Channel); |
||||
|
} |
||||
|
|
||||
|
protected void CheckDisposed() |
||||
|
{ |
||||
|
if (IsDiposed) |
||||
|
{ |
||||
|
throw new AbpException("This object is disposed!"); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,32 @@ |
|||||
|
using Microsoft.Extensions.DependencyInjection; |
||||
|
using Volo.Abp; |
||||
|
using Volo.Abp.Modularity; |
||||
|
using Volo.Abp.RabbitMQ; |
||||
|
using WorkflowCore.Interface; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.RabbitMQ |
||||
|
{ |
||||
|
[DependsOn(typeof(AbpRabbitMqModule))] |
||||
|
[DependsOn(typeof(AbpWorkflowCoreModule))] |
||||
|
public class AbpWorkflowCoreRabbitMQModule : AbpModule |
||||
|
{ |
||||
|
public override void PreConfigureServices(ServiceConfigurationContext context) |
||||
|
{ |
||||
|
context.Services.AddSingleton<IQueueProvider, AbpRabbitMqQueueProvider>(); |
||||
|
context.Services.AddSingleton<AbpRabbitMqQueueProvider>(); |
||||
|
|
||||
|
PreConfigure<WorkflowOptions>(options => |
||||
|
{ |
||||
|
options.UseQueueProvider(provider => provider.GetRequiredService<AbpRabbitMqQueueProvider>()); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
public override void OnApplicationShutdown(ApplicationShutdownContext context) |
||||
|
{ |
||||
|
context.ServiceProvider |
||||
|
.GetRequiredService<AbpRabbitMqQueueProvider>() |
||||
|
.Dispose(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,9 @@ |
|||||
|
using WorkflowCore.Interface; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.RabbitMQ |
||||
|
{ |
||||
|
public interface IQueueNameNormalizer |
||||
|
{ |
||||
|
string NormalizeKey(QueueType queue); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,32 @@ |
|||||
|
using Microsoft.Extensions.Options; |
||||
|
using Volo.Abp.DependencyInjection; |
||||
|
using WorkflowCore.Interface; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.RabbitMQ |
||||
|
{ |
||||
|
public class QueueNameNormalizer : IQueueNameNormalizer, ISingletonDependency |
||||
|
{ |
||||
|
protected AbpRabbitMQWorkflowCoreOptions RabbitMQWorkflowCoreOptions { get; } |
||||
|
|
||||
|
public QueueNameNormalizer( |
||||
|
IOptions<AbpRabbitMQWorkflowCoreOptions> options) |
||||
|
{ |
||||
|
RabbitMQWorkflowCoreOptions = options.Value; |
||||
|
} |
||||
|
|
||||
|
public string NormalizeKey(QueueType queue) |
||||
|
{ |
||||
|
switch (queue) |
||||
|
{ |
||||
|
case QueueType.Workflow: |
||||
|
return RabbitMQWorkflowCoreOptions.DefaultQueueNamePrefix + "wfc.workflow_queue"; |
||||
|
case QueueType.Event: |
||||
|
return RabbitMQWorkflowCoreOptions.DefaultQueueNamePrefix + "wfc.event_queue"; |
||||
|
case QueueType.Index: |
||||
|
return RabbitMQWorkflowCoreOptions.DefaultQueueNamePrefix + "wfc.index_queue"; |
||||
|
default: |
||||
|
return null; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,17 @@ |
|||||
|
using Volo.Abp.RabbitMQ; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore.RabbitMQ |
||||
|
{ |
||||
|
public class WorkflowQueueConfiguration : QueueDeclareConfiguration |
||||
|
{ |
||||
|
public WorkflowQueueConfiguration( |
||||
|
string queueName, |
||||
|
bool durable = true, |
||||
|
bool exclusive = false, |
||||
|
bool autoDelete = false, |
||||
|
string deadLetterQueueName = null) |
||||
|
: base(queueName, durable, exclusive, autoDelete, deadLetterQueueName) |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,14 @@ |
|||||
|
<Project Sdk="Microsoft.NET.Sdk"> |
||||
|
|
||||
|
<PropertyGroup> |
||||
|
<TargetFramework>netstandard2.0</TargetFramework> |
||||
|
<LangVersion>8.0</LangVersion> |
||||
|
<RootNamespace /> |
||||
|
</PropertyGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<PackageReference Include="Volo.Abp.Core" Version="4.4.0" /> |
||||
|
<PackageReference Include="WorkflowCore.DSL" Version="3.6.1" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
</Project> |
||||
@ -0,0 +1,11 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Text; |
||||
|
using Volo.Abp.DependencyInjection; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
public class AbpWorkflowCoreConventionalRegistrar : DefaultConventionalRegistrar |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,61 @@ |
|||||
|
using Microsoft.Extensions.DependencyInjection; |
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using Volo.Abp; |
||||
|
using Volo.Abp.Modularity; |
||||
|
using WorkflowCore.Interface; |
||||
|
using WorkflowCore.Services; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
public class AbpWorkflowCoreModule : AbpModule |
||||
|
{ |
||||
|
private readonly static IList<Type> _definitionWorkflows = new List<Type>(); |
||||
|
|
||||
|
public override void PreConfigureServices(ServiceConfigurationContext context) |
||||
|
{ |
||||
|
AutoAddDefinitionWorkflows(context.Services); |
||||
|
} |
||||
|
|
||||
|
public override void ConfigureServices(ServiceConfigurationContext context) |
||||
|
{ |
||||
|
context.Services.AddWorkflow(options => |
||||
|
{ |
||||
|
context.Services.ExecutePreConfiguredActions(options); |
||||
|
}); |
||||
|
context.Services.AddWorkflowDSL(); |
||||
|
//context.Services.AddHostedService((provider) => provider.GetRequiredService<IWorkflowHost>());
|
||||
|
} |
||||
|
|
||||
|
public override void OnApplicationInitialization(ApplicationInitializationContext context) |
||||
|
{ |
||||
|
var workflowRegistry = context.ServiceProvider.GetRequiredService<IWorkflowRegistry>(); |
||||
|
|
||||
|
foreach (var definitionWorkflow in _definitionWorkflows) |
||||
|
{ |
||||
|
var workflow = context.ServiceProvider.GetRequiredService(definitionWorkflow); |
||||
|
workflowRegistry.RegisterWorkflow(workflow as WorkflowBase); |
||||
|
} |
||||
|
|
||||
|
var workflowHost = context.ServiceProvider.GetRequiredService<IWorkflowHost>(); |
||||
|
workflowHost.Start(); |
||||
|
} |
||||
|
|
||||
|
public override void OnApplicationShutdown(ApplicationShutdownContext context) |
||||
|
{ |
||||
|
var workflowHost = context.ServiceProvider.GetRequiredService<IWorkflowHost>(); |
||||
|
workflowHost.Stop(); |
||||
|
} |
||||
|
|
||||
|
private static void AutoAddDefinitionWorkflows(IServiceCollection services) |
||||
|
{ |
||||
|
services.OnRegistred(context => |
||||
|
{ |
||||
|
if (typeof(WorkflowBase).IsAssignableFrom(context.ImplementationType)) |
||||
|
{ |
||||
|
_definitionWorkflows.Add(context.ImplementationType); |
||||
|
} |
||||
|
}); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,10 @@ |
|||||
|
using Volo.Abp.Collections; |
||||
|
using WorkflowCore.Interface; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
public class AbpWorkflowCoreOptions |
||||
|
{ |
||||
|
public ITypeList<IWorkflow> DefinitionProviders { get; } |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,6 @@ |
|||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
public interface IWorkflowEnabled |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,10 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Text; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
public interface IWorkflowManager |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,14 @@ |
|||||
|
using Volo.Abp.DependencyInjection; |
||||
|
using WorkflowCore.Interface; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
public class NullStepBody : StepBody, ITransientDependency |
||||
|
{ |
||||
|
public override ExecutionResult Run(IStepExecutionContext context) |
||||
|
{ |
||||
|
return ExecutionResult.Next(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,14 @@ |
|||||
|
using Volo.Abp.DependencyInjection; |
||||
|
using WorkflowCore.Interface; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
public abstract class WorkflowBase : IWorkflow, ISingletonDependency |
||||
|
{ |
||||
|
public abstract string Id { get; } |
||||
|
|
||||
|
public abstract int Version { get; } |
||||
|
|
||||
|
public abstract void Build(IWorkflowBuilder<object> builder); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,18 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
public class WorkflowDefine |
||||
|
{ |
||||
|
public string Id { get; set; } |
||||
|
public int Version { get; set; } |
||||
|
public string Name { get; set; } |
||||
|
public Type DataType { get; set; } |
||||
|
public List<WorkflowStepBody> Steps { get; set; } |
||||
|
public WorkflowDefine() |
||||
|
{ |
||||
|
Steps = new List<WorkflowStepBody>(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,31 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using WorkflowCore.Models; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
public class WorkflowStepBody |
||||
|
{ |
||||
|
public string Id { get; set; } |
||||
|
public string Name { get; set; } |
||||
|
public Type StepType { get; set; } |
||||
|
public bool Saga { get; set; } |
||||
|
public string DisplayName { get; set; } |
||||
|
public string NextStep { get; set; } |
||||
|
public string CancelCondition { get; set; } |
||||
|
public TimeSpan? RetryInterval { get; set; } |
||||
|
public WorkflowErrorHandling? ErrorBehavior { get; set; } |
||||
|
public List<WorkflowStepBody> CompensateWith { get; set; } |
||||
|
public Dictionary<string, object> Inputs { get; set; } |
||||
|
public Dictionary<string, object> Outputs { get; set; } |
||||
|
public Dictionary<string, object> SelectNextStep { get; set; } |
||||
|
|
||||
|
public WorkflowStepBody() |
||||
|
{ |
||||
|
CompensateWith = new List<WorkflowStepBody>(); |
||||
|
Inputs = new Dictionary<string, object>(); |
||||
|
Outputs = new Dictionary<string, object>(); |
||||
|
SelectNextStep = new Dictionary<string, object>(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,19 @@ |
|||||
|
using Newtonsoft.Json; |
||||
|
|
||||
|
namespace System |
||||
|
{ |
||||
|
public static class ObjectSerializerExtensions |
||||
|
{ |
||||
|
private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All }; |
||||
|
|
||||
|
public static string SerializeObject(this object obj, JsonSerializerSettings serializerSettings = null) |
||||
|
{ |
||||
|
return JsonConvert.SerializeObject(obj, serializerSettings ?? SerializerSettings); |
||||
|
} |
||||
|
|
||||
|
public static object DeserializeObject(this string str, JsonSerializerSettings serializerSettings = null) |
||||
|
{ |
||||
|
return JsonConvert.DeserializeObject(str, serializerSettings ?? SerializerSettings); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,19 @@ |
|||||
|
namespace System |
||||
|
{ |
||||
|
public static class UtcDateTimeExtensions |
||||
|
{ |
||||
|
public static DateTime? ToNullableUtcDateTime(this DateTime? dateTime) |
||||
|
{ |
||||
|
if (dateTime.HasValue) |
||||
|
{ |
||||
|
return dateTime.Value.ToUtcDateTime(); |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
public static DateTime ToUtcDateTime(this DateTime dateTime) |
||||
|
{ |
||||
|
return DateTime.SpecifyKind(dateTime, DateTimeKind.Utc); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,18 @@ |
|||||
|
<Project Sdk="Microsoft.NET.Sdk"> |
||||
|
|
||||
|
<PropertyGroup> |
||||
|
<TargetFramework>net5.0</TargetFramework> |
||||
|
<RootNamespace /> |
||||
|
<IsPackable>false</IsPackable> |
||||
|
</PropertyGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.4" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
<ItemGroup> |
||||
|
<ProjectReference Include="..\..\modules\workflow\LINGYUN.Abp.WorkflowCore\LINGYUN.Abp.WorkflowCore.csproj" /> |
||||
|
<ProjectReference Include="..\LINGYUN.Abp.TestBase\LINGYUN.Abp.TestsBase.csproj" /> |
||||
|
</ItemGroup> |
||||
|
|
||||
|
</Project> |
||||
@ -0,0 +1,8 @@ |
|||||
|
using LINGYUN.Abp.Tests; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
public abstract class AbpWorkflowCoreTestBase : AbpTestsBase<AbpWorkflowCoreTestModule> |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,11 @@ |
|||||
|
using LINGYUN.Abp.Tests; |
||||
|
using Volo.Abp.Modularity; |
||||
|
|
||||
|
namespace LINGYUN.Abp.WorkflowCore |
||||
|
{ |
||||
|
[DependsOn(typeof(AbpTestsBaseModule))] |
||||
|
[DependsOn(typeof(AbpWorkflowCoreModule))] |
||||
|
public class AbpWorkflowCoreTestModule : AbpModule |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue