Browse Source

feat(workflow): use uniform elasticsearch index

pull/432/head
cKey 4 years ago
parent
commit
a2ea908de8
  1. 1
      aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.csproj
  2. 2
      aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchModule.cs
  3. 10
      aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchOptions.cs
  4. 87
      aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/ElasticsearchPersistenceProvider.cs
  5. 9
      aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexInitializer.cs
  6. 7
      aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexNameNormalizer.cs
  7. 11
      aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexConsts.cs
  8. 163
      aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexInitializer.cs
  9. 21
      aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexNameNormalizer.cs

1
aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.csproj

@ -10,6 +10,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Volo.Abp.Guids" Version="$(VoloAbpPackageVersion)" /> <PackageReference Include="Volo.Abp.Guids" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Json" Version="$(VoloAbpPackageVersion)" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

2
aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchModule.cs

@ -1,5 +1,6 @@
using LINGYUN.Abp.Elasticsearch; using LINGYUN.Abp.Elasticsearch;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Json;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using WorkflowCore.Interface; using WorkflowCore.Interface;
using WorkflowCore.Models; using WorkflowCore.Models;
@ -7,6 +8,7 @@ using WorkflowCore.Models;
namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
{ {
[DependsOn(typeof(AbpWorkflowCoreModule))] [DependsOn(typeof(AbpWorkflowCoreModule))]
[DependsOn(typeof(AbpJsonModule))]
[DependsOn(typeof(AbpElasticsearchModule))] [DependsOn(typeof(AbpElasticsearchModule))]
public class AbpWorkflowCorePersistenceElasticsearchModule : AbpModule public class AbpWorkflowCorePersistenceElasticsearchModule : AbpModule
{ {

10
aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchOptions.cs

@ -1,14 +1,18 @@
namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch using Nest;
namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
{ {
public class AbpWorkflowCorePersistenceElasticsearchOptions public class AbpWorkflowCorePersistenceElasticsearchOptions
{ {
/// <summary> /// <summary>
/// Default Value: abp.workflows.persistence /// Default Value: abp.workflows.persistence.{0}
/// </summary> /// </summary>
public string IndexFormat { get; set; } public string IndexFormat { get; set; }
public IIndexSettings IndexSettings { get; set; }
public AbpWorkflowCorePersistenceElasticsearchOptions() public AbpWorkflowCorePersistenceElasticsearchOptions()
{ {
IndexFormat = "abp.workflows.persistence"; IndexFormat = "abp.workflows.persistence.{0}";
IndexSettings = new IndexSettings();
} }
} }
} }

87
aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/ElasticsearchPersistenceProvider.cs

@ -1,7 +1,6 @@
using LINGYUN.Abp.Elasticsearch; using LINGYUN.Abp.Elasticsearch;
using LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.Models; using LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.Models;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Nest; using Nest;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
@ -11,6 +10,7 @@ using System.Threading.Tasks;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids; using Volo.Abp.Guids;
using Volo.Abp.Threading;
using WorkflowCore.Interface; using WorkflowCore.Interface;
using WorkflowCore.Models; using WorkflowCore.Models;
@ -21,18 +21,21 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
private readonly ILogger<ElasticsearchPersistenceProvider> _logger; private readonly ILogger<ElasticsearchPersistenceProvider> _logger;
private readonly IGuidGenerator _guidGenerator; private readonly IGuidGenerator _guidGenerator;
private readonly IPersistenceIndexNameNormalizer _indexNameNormalizer;
private readonly IPersistenceIndexInitializer _indexInitializer;
private readonly IElasticsearchClientFactory _elasticsearchClientFactory; private readonly IElasticsearchClientFactory _elasticsearchClientFactory;
private readonly AbpWorkflowCorePersistenceElasticsearchOptions _options;
public ElasticsearchPersistenceProvider( public ElasticsearchPersistenceProvider(
IGuidGenerator guidGenerator, IGuidGenerator guidGenerator,
IElasticsearchClientFactory elasticsearchClientFactory, IElasticsearchClientFactory elasticsearchClientFactory,
IOptions<AbpWorkflowCorePersistenceElasticsearchOptions> options, IPersistenceIndexInitializer indexInitializer,
IPersistenceIndexNameNormalizer indexNameNormalizer,
ILogger<ElasticsearchPersistenceProvider> logger) ILogger<ElasticsearchPersistenceProvider> logger)
{ {
_guidGenerator = guidGenerator; _guidGenerator = guidGenerator;
_elasticsearchClientFactory = elasticsearchClientFactory; _elasticsearchClientFactory = elasticsearchClientFactory;
_options = options.Value; _indexInitializer = indexInitializer;
_indexNameNormalizer = indexNameNormalizer;
_logger = logger; _logger = logger;
} }
@ -46,7 +49,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.GetAsync<EventSubscription>( var response = await client.GetAsync<EventSubscription>(
id, id,
dsl => dsl.Index(CreateIndex()), dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)),
ct: cancellationToken); ct: cancellationToken);
CheckResponse(response); CheckResponse(response);
@ -63,7 +66,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
await client.UpdateAsync<EventSubscription>( await client.UpdateAsync<EventSubscription>(
id, id,
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex))
.Doc(response.Source), .Doc(response.Source),
ct: cancellationToken); ct: cancellationToken);
} }
@ -79,7 +82,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.IndexAsync( var response = await client.IndexAsync(
newEvent, newEvent,
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex))
.Id(newEventId), .Id(newEventId),
ct: cancellationToken); ct: cancellationToken);
@ -98,7 +101,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.IndexAsync( var response = await client.IndexAsync(
subscription, subscription,
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex))
.Id(newSubscriptionId), .Id(newSubscriptionId),
ct: cancellationToken); ct: cancellationToken);
@ -117,7 +120,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.IndexAsync( var response = await client.IndexAsync(
workflow, workflow,
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex))
.Id(newWorkflowId), .Id(newWorkflowId),
ct: cancellationToken); ct: cancellationToken);
@ -128,14 +131,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public void EnsureStoreExists() public void EnsureStoreExists()
{ {
// TODO: 为什么是同步API... AsyncHelper.RunSync(async () => await _indexInitializer.InitializeAsync());
var client = CreateClient();
var response = client.Indices.Exists(CreateIndex());
if (!response.Exists)
{
client.Indices.Create(CreateIndex());
}
} }
public virtual async Task<Event> GetEvent(string id, CancellationToken cancellationToken = default) public virtual async Task<Event> GetEvent(string id, CancellationToken cancellationToken = default)
@ -146,7 +142,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.GetAsync<Event>( var response = await client.GetAsync<Event>(
eventId, eventId,
dsl => dsl.Index(CreateIndex()), dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex)),
ct: cancellationToken); ct: cancellationToken);
CheckResponse(response); CheckResponse(response);
@ -165,7 +161,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
terms.Add(x => x.DateRange(t => t.Field(f => f.EventTime).GreaterThanOrEquals(asOf))); terms.Add(x => x.DateRange(t => t.Field(f => f.EventTime).GreaterThanOrEquals(asOf)));
var response = await client.SearchAsync<Event>( var response = await client.SearchAsync<Event>(
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex))
.Query(q => q.Bool(b => b.Filter(terms))) .Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))), .Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))),
ct: cancellationToken); ct: cancellationToken);
@ -187,7 +183,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
terms.Add(x => x.DateRange(t => t.Field(f => f.SubscribeAsOf).LessThanOrEquals(asOf))); terms.Add(x => x.DateRange(t => t.Field(f => f.SubscribeAsOf).LessThanOrEquals(asOf)));
var response = await client.SearchAsync<EventSubscription>( var response = await client.SearchAsync<EventSubscription>(
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex))
.Query(q => q.Bool(b => b.Filter(terms))) .Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))) .Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword"))))
.Sort(s => s.Field(f => f.SubscribeAsOf, SortOrder.Ascending)) .Sort(s => s.Field(f => f.SubscribeAsOf, SortOrder.Ascending))
@ -210,7 +206,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
terms.Add(x => x.DateRange(t => t.Field(f => f.EventTime).LessThanOrEquals(now))); terms.Add(x => x.DateRange(t => t.Field(f => f.EventTime).LessThanOrEquals(now)));
var response = await client.SearchAsync<Event>( var response = await client.SearchAsync<Event>(
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex))
.Query(q => q.Bool(b => b.Filter(terms))) .Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))), .Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))),
ct: cancellationToken); ct: cancellationToken);
@ -231,7 +227,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
terms.Add(x => x.Term(t => t.Field(f => f.Status).Value(WorkflowStatus.Runnable))); terms.Add(x => x.Term(t => t.Field(f => f.Status).Value(WorkflowStatus.Runnable)));
var response = await client.SearchAsync<WorkflowInstance>( var response = await client.SearchAsync<WorkflowInstance>(
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex))
.Query(q => q.Bool(b => b.Filter(terms))) .Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))), .Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))),
ct: cancellationToken); ct: cancellationToken);
@ -249,7 +245,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.GetAsync<EventSubscription>( var response = await client.GetAsync<EventSubscription>(
id, id,
dsl => dsl.Index(CreateIndex()), dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)),
ct: cancellationToken); ct: cancellationToken);
CheckResponse(response); CheckResponse(response);
@ -269,7 +265,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
terms.Add(x => x.DateRange(t => t.Field(f => f.SubscribeAsOf).LessThanOrEquals(now))); terms.Add(x => x.DateRange(t => t.Field(f => f.SubscribeAsOf).LessThanOrEquals(now)));
var response = await client.SearchAsync<EventSubscription>( var response = await client.SearchAsync<EventSubscription>(
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex))
.Query(q => q.Bool(b => b.Filter(terms))) .Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.IncludeAll()), .Source(s => s.IncludeAll()),
ct: cancellationToken); ct: cancellationToken);
@ -286,7 +282,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.GetAsync<WorkflowInstance>( var response = await client.GetAsync<WorkflowInstance>(
workflowId, workflowId,
dsl => dsl.Index(CreateIndex()), dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex)),
ct: cancellationToken); ct: cancellationToken);
CheckResponse(response); CheckResponse(response);
@ -318,7 +314,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
} }
var response = await client.SearchAsync<WorkflowInstance>( var response = await client.SearchAsync<WorkflowInstance>(
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex))
.Query(q => q.Bool(b => b.Filter(terms))) .Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.IncludeAll()) .Source(s => s.IncludeAll())
.Skip(skip) .Skip(skip)
@ -334,7 +330,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var client = CreateClient(); var client = CreateClient();
var response = await client.SearchAsync<WorkflowInstance>( var response = await client.SearchAsync<WorkflowInstance>(
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex))
.Query(q => .Query(q =>
q.Bool(b => q.Bool(b =>
b.Should(s => b.Should(s =>
@ -350,12 +346,13 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public virtual async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default) public virtual async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default)
{ {
var eventId = Guid.Parse(id); var eventId = Guid.Parse(id);
var indexName = CreateIndex(PersistenceIndexConsts.EventIndex);
var client = CreateClient(); var client = CreateClient();
var response = await client.GetAsync<Event>( var response = await client.GetAsync<Event>(
eventId, eventId,
dsl => dsl.Index(CreateIndex()), dsl => dsl.Index(indexName),
ct: cancellationToken); ct: cancellationToken);
CheckResponse(response); CheckResponse(response);
@ -366,7 +363,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
await client.UpdateAsync<Event>( await client.UpdateAsync<Event>(
id, id,
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(indexName)
.Doc(response.Source), .Doc(response.Source),
ct: cancellationToken); ct: cancellationToken);
} }
@ -375,12 +372,13 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public virtual async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default) public virtual async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default)
{ {
var eventId = Guid.Parse(id); var eventId = Guid.Parse(id);
var indexName = CreateIndex(PersistenceIndexConsts.EventIndex);
var client = CreateClient(); var client = CreateClient();
var response = await client.GetAsync<Event>( var response = await client.GetAsync<Event>(
eventId, eventId,
dsl => dsl.Index(CreateIndex()), dsl => dsl.Index(indexName),
ct: cancellationToken); ct: cancellationToken);
CheckResponse(response); CheckResponse(response);
@ -391,7 +389,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
await client.UpdateAsync<Event>( await client.UpdateAsync<Event>(
id, id,
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(indexName)
.Doc(response.Source), .Doc(response.Source),
ct: cancellationToken); ct: cancellationToken);
} }
@ -406,7 +404,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.IndexManyAsync( var response = await client.IndexManyAsync(
errors, errors,
CreateIndex(), CreateIndex(PersistenceIndexConsts.ExecutionErrorIndex),
cancellationToken: cancellationToken); cancellationToken: cancellationToken);
CheckResponse(response); CheckResponse(response);
@ -416,19 +414,20 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public virtual async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) public virtual async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
{ {
var workflowId = Guid.Parse(workflow.Id); var workflowId = Guid.Parse(workflow.Id);
var indexName = CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex);
var client = CreateClient(); var client = CreateClient();
var response = await client.GetAsync<WorkflowInstance>( var response = await client.GetAsync<WorkflowInstance>(
workflowId, workflowId,
dsl => dsl.Index(CreateIndex()), dsl => dsl.Index(indexName),
ct: cancellationToken); ct: cancellationToken);
CheckResponse(response); CheckResponse(response);
await client.UpdateAsync<WorkflowInstance>( await client.UpdateAsync<WorkflowInstance>(
workflowId, workflowId,
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(indexName)
.Doc(workflow), .Doc(workflow),
ct: cancellationToken); ct: cancellationToken);
} }
@ -436,13 +435,14 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public virtual async Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default) public virtual async Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
{ {
var client = CreateClient(); var client = CreateClient();
var indexName = CreateIndex(PersistenceIndexConsts.ScheduledCommandIndex);
var terms = new List<Func<QueryContainerDescriptor<PersistedScheduledCommand>, QueryContainer>>(); var terms = new List<Func<QueryContainerDescriptor<PersistedScheduledCommand>, QueryContainer>>();
terms.Add(x => x.LongRange(t => t.Field(f => f.ExecuteTime).LessThan(asOf.Ticks))); terms.Add(x => x.LongRange(t => t.Field(f => f.ExecuteTime).LessThan(asOf.Ticks)));
var response = await client.SearchAsync<PersistedScheduledCommand>( var response = await client.SearchAsync<PersistedScheduledCommand>(
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(indexName)
.Query(q => q.Bool(b => b.Filter(terms))) .Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.IncludeAll()), .Source(s => s.IncludeAll()),
ct: cancellationToken); ct: cancellationToken);
@ -457,7 +457,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
await client.DeleteAsync<PersistedScheduledCommand>( await client.DeleteAsync<PersistedScheduledCommand>(
command.Id, command.Id,
dsl => dsl.Index(CreateIndex()), dsl => dsl.Index(indexName),
ct: cancellationToken); ct: cancellationToken);
} }
catch (Exception) catch (Exception)
@ -477,7 +477,9 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.IndexAsync( var response = await client.IndexAsync(
persistedCommand, persistedCommand,
dsl => dsl.Index(CreateIndex()).Id(persistedCommand.Id)); dsl => dsl
.Index(CreateIndex(PersistenceIndexConsts.ScheduledCommandIndex))
.Id(persistedCommand.Id));
CheckResponse(response); CheckResponse(response);
} }
@ -485,12 +487,13 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public virtual async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default) public virtual async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
{ {
var id = Guid.Parse(eventSubscriptionId); var id = Guid.Parse(eventSubscriptionId);
var indexName = CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex);
var client = CreateClient(); var client = CreateClient();
var response = await client.GetAsync<EventSubscription>( var response = await client.GetAsync<EventSubscription>(
id, id,
dsl => dsl.Index(CreateIndex()), dsl => dsl.Index(indexName),
ct: cancellationToken); ct: cancellationToken);
CheckResponse(response); CheckResponse(response);
@ -503,7 +506,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var uptResponse = await client.UpdateAsync<EventSubscription>( var uptResponse = await client.UpdateAsync<EventSubscription>(
id, id,
dsl => dsl.Index(CreateIndex()) dsl => dsl.Index(indexName)
.Doc(response.Source), .Doc(response.Source),
ct: cancellationToken); ct: cancellationToken);
@ -521,7 +524,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.DeleteAsync<EventSubscription>( var response = await client.DeleteAsync<EventSubscription>(
id, id,
dsl => dsl.Index(CreateIndex()), dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)),
ct: cancellationToken); ct: cancellationToken);
CheckResponse(response); CheckResponse(response);
@ -532,9 +535,9 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
return _elasticsearchClientFactory.Create(); return _elasticsearchClientFactory.Create();
} }
private string CreateIndex() private string CreateIndex(string index)
{ {
return _options.IndexFormat; return _indexNameNormalizer.NormalizeIndex(index);
} }
private void CheckResponse(IResponse response) private void CheckResponse(IResponse response)

9
aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexInitializer.cs

@ -0,0 +1,9 @@
using System.Threading.Tasks;
namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
{
public interface IPersistenceIndexInitializer
{
Task InitializeAsync();
}
}

7
aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexNameNormalizer.cs

@ -0,0 +1,7 @@
namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
{
public interface IPersistenceIndexNameNormalizer
{
string NormalizeIndex(string index);
}
}

11
aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexConsts.cs

@ -0,0 +1,11 @@
namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
{
internal static class PersistenceIndexConsts
{
public const string WorkflowInstanceIndex = "instances";
public const string EventIndex = "events";
public const string EventSubscriptionIndex = "subscriptions";
public const string ExecutionErrorIndex = "executionerrors";
public const string ScheduledCommandIndex = "scheduledcommands";
}
}

163
aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexInitializer.cs

@ -0,0 +1,163 @@
using LINGYUN.Abp.Elasticsearch;
using LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Nest;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Volo.Abp;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Json;
using WorkflowCore.Models;
namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
{
public class PersistenceIndexInitializer : IPersistenceIndexInitializer, ISingletonDependency
{
private readonly ILogger<PersistenceIndexInitializer> _logger;
private readonly AbpJsonOptions _jsonOptions;
private readonly AbpWorkflowCorePersistenceElasticsearchOptions _elasticsearchOptions;
private readonly IPersistenceIndexNameNormalizer _nameNormalizer;
private readonly IElasticsearchClientFactory _clientFactory;
public PersistenceIndexInitializer(
IOptions<AbpJsonOptions> jsonOptions,
IOptions<AbpWorkflowCorePersistenceElasticsearchOptions> elasticsearchOptions,
IPersistenceIndexNameNormalizer nameNormalizer,
IElasticsearchClientFactory clientFactory,
ILogger<PersistenceIndexInitializer> logger)
{
_jsonOptions = jsonOptions.Value;
_elasticsearchOptions = elasticsearchOptions.Value;
_nameNormalizer = nameNormalizer;
_clientFactory = clientFactory;
_logger = logger;
}
public virtual async Task InitializeAsync()
{
var client = _clientFactory.Create();
var dateTimeFormat = !_jsonOptions.DefaultDateTimeFormat.IsNullOrWhiteSpace()
? $"{_jsonOptions.DefaultDateTimeFormat}||strict_date_optional_time||epoch_millis"
: "strict_date_optional_time||epoch_millis";
var indexState = new IndexState
{
Settings = _elasticsearchOptions.IndexSettings,
};
await InitlizeWorkflowInstanceIndex(client, indexState, dateTimeFormat);
await InitlizeEventIndex(client, indexState, dateTimeFormat);
await InitlizeEventSubscriptionIndex(client, indexState, dateTimeFormat);
await InitlizeExecutionErrorIndex(client, indexState, dateTimeFormat);
await InitlizeScheduledCommandIndex(client, indexState, dateTimeFormat);
}
protected virtual async Task InitlizeWorkflowInstanceIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat)
{
var indexName = _nameNormalizer.NormalizeIndex("instances");
var indexExists = await client.Indices.ExistsAsync(indexName);
if (!indexExists.Exists)
{
var indexCreateResponse = await client.Indices.CreateAsync(
indexName,
dsl => dsl.InitializeUsing(indexState)
.Map<WorkflowInstance>(map => map.AutoMap()
.Properties(mp =>
mp.Date(p => p.Name(n => n.CreateTime).Format(dateTimeFormat))
.Date(p => p.Name(n => n.CompleteTime).Format(dateTimeFormat))
.Nested<ExecutionPointer>(p => p.Name(n => n.ExecutionPointers)
.AutoMap()
.Properties(np =>
np.Date(p => p.Name(n => n.EndTime).Format(dateTimeFormat))
.Date(p => p.Name(n => n.StartTime).Format(dateTimeFormat))
.Date(p => p.Name(n => n.SleepUntil).Format(dateTimeFormat))
.Object<Dictionary<string, object>>(p => p.Name(n => n.ExtensionAttributes)))))));
CheckResponse(indexCreateResponse);
}
}
protected virtual async Task InitlizeEventIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat)
{
var indexName = _nameNormalizer.NormalizeIndex("events");
var indexExists = await client.Indices.ExistsAsync(indexName);
if (!indexExists.Exists)
{
var indexCreateResponse = await client.Indices.CreateAsync(
indexName,
dsl => dsl.InitializeUsing(indexState)
.Map<Event>(map => map.AutoMap()
.Properties(mp =>
mp.Date(p => p.Name(n => n.EventTime).Format(dateTimeFormat)))));
CheckResponse(indexCreateResponse);
}
}
protected virtual async Task InitlizeEventSubscriptionIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat)
{
var indexName = _nameNormalizer.NormalizeIndex("subscriptions");
var indexExists = await client.Indices.ExistsAsync(indexName);
if (!indexExists.Exists)
{
var indexCreateResponse = await client.Indices.CreateAsync(
indexName,
dsl => dsl.InitializeUsing(indexState)
.Map<EventSubscription>(map => map.AutoMap()
.Properties(mp =>
mp.Date(p => p.Name(n => n.SubscribeAsOf).Format(dateTimeFormat))
.Date(p => p.Name(n => n.ExternalTokenExpiry).Format(dateTimeFormat)))));
CheckResponse(indexCreateResponse);
}
}
protected virtual async Task InitlizeExecutionErrorIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat)
{
var indexName = _nameNormalizer.NormalizeIndex("executionerrors");
var indexExists = await client.Indices.ExistsAsync(indexName);
if (!indexExists.Exists)
{
var indexCreateResponse = await client.Indices.CreateAsync(
indexName,
dsl => dsl.InitializeUsing(indexState)
.Map<ExecutionError>(map => map.AutoMap()
.Properties(mp =>
mp.Date(p => p.Name(n => n.ErrorTime).Format(dateTimeFormat)))));
CheckResponse(indexCreateResponse);
}
}
protected virtual async Task InitlizeScheduledCommandIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat)
{
var indexName = _nameNormalizer.NormalizeIndex("scheduledcommands");
var indexExists = await client.Indices.ExistsAsync(indexName);
if (!indexExists.Exists)
{
var indexCreateResponse = await client.Indices.CreateAsync(
indexName,
dsl => dsl.InitializeUsing(indexState)
.Map<PersistedScheduledCommand>(map => map.AutoMap()));
CheckResponse(indexCreateResponse);
}
}
private void CheckResponse(IResponse response)
{
if (!response.ApiCall.Success)
{
_logger.LogError(default(EventId), response.ApiCall.OriginalException, $"ES Persistence index initlize failed");
throw new AbpException($"ES Operation Failed", response.ApiCall.OriginalException);
}
if (!response.IsValid)
{
_logger.LogWarning("ES Persistence index initlize valid error: {0}", response.DebugInformation);
throw new InvalidOperationException(response.DebugInformation, response.OriginalException);
}
}
}
}

21
aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexNameNormalizer.cs

@ -0,0 +1,21 @@
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
{
public class PersistenceIndexNameNormalizer : IPersistenceIndexNameNormalizer, ISingletonDependency
{
private readonly AbpWorkflowCorePersistenceElasticsearchOptions _options;
public PersistenceIndexNameNormalizer(
IOptions<AbpWorkflowCorePersistenceElasticsearchOptions> options)
{
_options = options.Value;
}
public string NormalizeIndex(string index)
{
return string.Format(_options.IndexFormat, index);
}
}
}
Loading…
Cancel
Save