From a2ea908de8c4f2797adedb2c64461d78aa76a358 Mon Sep 17 00:00:00 2001
From: cKey <35512826+colinin@users.noreply.github.com>
Date: Thu, 9 Dec 2021 16:07:55 +0800
Subject: [PATCH] feat(workflow): use uniform elasticsearch index
---
...kflowCore.Persistence.Elasticsearch.csproj | 1 +
...kflowCorePersistenceElasticsearchModule.cs | 2 +
...flowCorePersistenceElasticsearchOptions.cs | 10 +-
.../ElasticsearchPersistenceProvider.cs | 87 +++++-----
.../IPersistenceIndexInitializer.cs | 9 +
.../IPersistenceIndexNameNormalizer.cs | 7 +
.../Elasticsearch/PersistenceIndexConsts.cs | 11 ++
.../PersistenceIndexInitializer.cs | 163 ++++++++++++++++++
.../PersistenceIndexNameNormalizer.cs | 21 +++
9 files changed, 266 insertions(+), 45 deletions(-)
create mode 100644 aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexInitializer.cs
create mode 100644 aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexNameNormalizer.cs
create mode 100644 aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexConsts.cs
create mode 100644 aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexInitializer.cs
create mode 100644 aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexNameNormalizer.cs
diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.csproj b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.csproj
index f4dd0bf87..9b032dfb6 100644
--- a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.csproj
+++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.csproj
@@ -10,6 +10,7 @@
+
diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchModule.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchModule.cs
index 032403983..c2c44cee7 100644
--- a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchModule.cs
+++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchModule.cs
@@ -1,5 +1,6 @@
using LINGYUN.Abp.Elasticsearch;
using Microsoft.Extensions.DependencyInjection;
+using Volo.Abp.Json;
using Volo.Abp.Modularity;
using WorkflowCore.Interface;
using WorkflowCore.Models;
@@ -7,6 +8,7 @@ using WorkflowCore.Models;
namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
{
[DependsOn(typeof(AbpWorkflowCoreModule))]
+ [DependsOn(typeof(AbpJsonModule))]
[DependsOn(typeof(AbpElasticsearchModule))]
public class AbpWorkflowCorePersistenceElasticsearchModule : AbpModule
{
diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchOptions.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchOptions.cs
index 4bf9dc672..43f522a3e 100644
--- a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchOptions.cs
+++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchOptions.cs
@@ -1,14 +1,18 @@
-namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
+using Nest;
+
+namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
{
public class AbpWorkflowCorePersistenceElasticsearchOptions
{
///
- /// Default Value: abp.workflows.persistence
+ /// Default Value: abp.workflows.persistence.{0}
///
public string IndexFormat { get; set; }
+ public IIndexSettings IndexSettings { get; set; }
public AbpWorkflowCorePersistenceElasticsearchOptions()
{
- IndexFormat = "abp.workflows.persistence";
+ IndexFormat = "abp.workflows.persistence.{0}";
+ IndexSettings = new IndexSettings();
}
}
}
diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/ElasticsearchPersistenceProvider.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/ElasticsearchPersistenceProvider.cs
index 93ca53e38..e1800bd1a 100644
--- a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/ElasticsearchPersistenceProvider.cs
+++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/ElasticsearchPersistenceProvider.cs
@@ -1,7 +1,6 @@
using LINGYUN.Abp.Elasticsearch;
using LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.Models;
using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Options;
using Nest;
using System;
using System.Collections.Generic;
@@ -11,6 +10,7 @@ using System.Threading.Tasks;
using Volo.Abp;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids;
+using Volo.Abp.Threading;
using WorkflowCore.Interface;
using WorkflowCore.Models;
@@ -21,18 +21,21 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
private readonly ILogger _logger;
private readonly IGuidGenerator _guidGenerator;
+ private readonly IPersistenceIndexNameNormalizer _indexNameNormalizer;
+ private readonly IPersistenceIndexInitializer _indexInitializer;
private readonly IElasticsearchClientFactory _elasticsearchClientFactory;
- private readonly AbpWorkflowCorePersistenceElasticsearchOptions _options;
public ElasticsearchPersistenceProvider(
IGuidGenerator guidGenerator,
IElasticsearchClientFactory elasticsearchClientFactory,
- IOptions options,
+ IPersistenceIndexInitializer indexInitializer,
+ IPersistenceIndexNameNormalizer indexNameNormalizer,
ILogger logger)
{
_guidGenerator = guidGenerator;
_elasticsearchClientFactory = elasticsearchClientFactory;
- _options = options.Value;
+ _indexInitializer = indexInitializer;
+ _indexNameNormalizer = indexNameNormalizer;
_logger = logger;
}
@@ -46,7 +49,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.GetAsync(
id,
- dsl => dsl.Index(CreateIndex()),
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)),
ct: cancellationToken);
CheckResponse(response);
@@ -63,7 +66,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
await client.UpdateAsync(
id,
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex))
.Doc(response.Source),
ct: cancellationToken);
}
@@ -79,7 +82,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.IndexAsync(
newEvent,
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex))
.Id(newEventId),
ct: cancellationToken);
@@ -98,7 +101,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.IndexAsync(
subscription,
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex))
.Id(newSubscriptionId),
ct: cancellationToken);
@@ -117,7 +120,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.IndexAsync(
workflow,
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex))
.Id(newWorkflowId),
ct: cancellationToken);
@@ -128,14 +131,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public void EnsureStoreExists()
{
- // TODO: 为什么是同步API...
- var client = CreateClient();
-
- var response = client.Indices.Exists(CreateIndex());
- if (!response.Exists)
- {
- client.Indices.Create(CreateIndex());
- }
+ AsyncHelper.RunSync(async () => await _indexInitializer.InitializeAsync());
}
public virtual async Task GetEvent(string id, CancellationToken cancellationToken = default)
@@ -146,7 +142,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.GetAsync(
eventId,
- dsl => dsl.Index(CreateIndex()),
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex)),
ct: cancellationToken);
CheckResponse(response);
@@ -165,7 +161,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
terms.Add(x => x.DateRange(t => t.Field(f => f.EventTime).GreaterThanOrEquals(asOf)));
var response = await client.SearchAsync(
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex))
.Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))),
ct: cancellationToken);
@@ -187,7 +183,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
terms.Add(x => x.DateRange(t => t.Field(f => f.SubscribeAsOf).LessThanOrEquals(asOf)));
var response = await client.SearchAsync(
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex))
.Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword"))))
.Sort(s => s.Field(f => f.SubscribeAsOf, SortOrder.Ascending))
@@ -210,7 +206,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
terms.Add(x => x.DateRange(t => t.Field(f => f.EventTime).LessThanOrEquals(now)));
var response = await client.SearchAsync(
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex))
.Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))),
ct: cancellationToken);
@@ -231,7 +227,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
terms.Add(x => x.Term(t => t.Field(f => f.Status).Value(WorkflowStatus.Runnable)));
var response = await client.SearchAsync(
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex))
.Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))),
ct: cancellationToken);
@@ -249,7 +245,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.GetAsync(
id,
- dsl => dsl.Index(CreateIndex()),
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)),
ct: cancellationToken);
CheckResponse(response);
@@ -269,7 +265,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
terms.Add(x => x.DateRange(t => t.Field(f => f.SubscribeAsOf).LessThanOrEquals(now)));
var response = await client.SearchAsync(
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex))
.Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.IncludeAll()),
ct: cancellationToken);
@@ -286,7 +282,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.GetAsync(
workflowId,
- dsl => dsl.Index(CreateIndex()),
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex)),
ct: cancellationToken);
CheckResponse(response);
@@ -318,7 +314,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
}
var response = await client.SearchAsync(
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex))
.Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.IncludeAll())
.Skip(skip)
@@ -334,7 +330,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var client = CreateClient();
var response = await client.SearchAsync(
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex))
.Query(q =>
q.Bool(b =>
b.Should(s =>
@@ -350,12 +346,13 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public virtual async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default)
{
var eventId = Guid.Parse(id);
+ var indexName = CreateIndex(PersistenceIndexConsts.EventIndex);
var client = CreateClient();
var response = await client.GetAsync(
eventId,
- dsl => dsl.Index(CreateIndex()),
+ dsl => dsl.Index(indexName),
ct: cancellationToken);
CheckResponse(response);
@@ -366,7 +363,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
await client.UpdateAsync(
id,
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(indexName)
.Doc(response.Source),
ct: cancellationToken);
}
@@ -375,12 +372,13 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public virtual async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default)
{
var eventId = Guid.Parse(id);
+ var indexName = CreateIndex(PersistenceIndexConsts.EventIndex);
var client = CreateClient();
var response = await client.GetAsync(
eventId,
- dsl => dsl.Index(CreateIndex()),
+ dsl => dsl.Index(indexName),
ct: cancellationToken);
CheckResponse(response);
@@ -391,7 +389,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
await client.UpdateAsync(
id,
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(indexName)
.Doc(response.Source),
ct: cancellationToken);
}
@@ -406,7 +404,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.IndexManyAsync(
errors,
- CreateIndex(),
+ CreateIndex(PersistenceIndexConsts.ExecutionErrorIndex),
cancellationToken: cancellationToken);
CheckResponse(response);
@@ -416,19 +414,20 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public virtual async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
{
var workflowId = Guid.Parse(workflow.Id);
+ var indexName = CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex);
var client = CreateClient();
var response = await client.GetAsync(
workflowId,
- dsl => dsl.Index(CreateIndex()),
+ dsl => dsl.Index(indexName),
ct: cancellationToken);
CheckResponse(response);
await client.UpdateAsync(
workflowId,
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(indexName)
.Doc(workflow),
ct: cancellationToken);
}
@@ -436,13 +435,14 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public virtual async Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default)
{
var client = CreateClient();
+ var indexName = CreateIndex(PersistenceIndexConsts.ScheduledCommandIndex);
var terms = new List, QueryContainer>>();
terms.Add(x => x.LongRange(t => t.Field(f => f.ExecuteTime).LessThan(asOf.Ticks)));
var response = await client.SearchAsync(
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(indexName)
.Query(q => q.Bool(b => b.Filter(terms)))
.Source(s => s.IncludeAll()),
ct: cancellationToken);
@@ -457,7 +457,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
await client.DeleteAsync(
command.Id,
- dsl => dsl.Index(CreateIndex()),
+ dsl => dsl.Index(indexName),
ct: cancellationToken);
}
catch (Exception)
@@ -477,7 +477,9 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.IndexAsync(
persistedCommand,
- dsl => dsl.Index(CreateIndex()).Id(persistedCommand.Id));
+ dsl => dsl
+ .Index(CreateIndex(PersistenceIndexConsts.ScheduledCommandIndex))
+ .Id(persistedCommand.Id));
CheckResponse(response);
}
@@ -485,12 +487,13 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
public virtual async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
{
var id = Guid.Parse(eventSubscriptionId);
+ var indexName = CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex);
var client = CreateClient();
var response = await client.GetAsync(
id,
- dsl => dsl.Index(CreateIndex()),
+ dsl => dsl.Index(indexName),
ct: cancellationToken);
CheckResponse(response);
@@ -503,7 +506,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var uptResponse = await client.UpdateAsync(
id,
- dsl => dsl.Index(CreateIndex())
+ dsl => dsl.Index(indexName)
.Doc(response.Source),
ct: cancellationToken);
@@ -521,7 +524,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
var response = await client.DeleteAsync(
id,
- dsl => dsl.Index(CreateIndex()),
+ dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)),
ct: cancellationToken);
CheckResponse(response);
@@ -532,9 +535,9 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
return _elasticsearchClientFactory.Create();
}
- private string CreateIndex()
+ private string CreateIndex(string index)
{
- return _options.IndexFormat;
+ return _indexNameNormalizer.NormalizeIndex(index);
}
private void CheckResponse(IResponse response)
diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexInitializer.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexInitializer.cs
new file mode 100644
index 000000000..4bb9d8632
--- /dev/null
+++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexInitializer.cs
@@ -0,0 +1,9 @@
+using System.Threading.Tasks;
+
+namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
+{
+ public interface IPersistenceIndexInitializer
+ {
+ Task InitializeAsync();
+ }
+}
diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexNameNormalizer.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexNameNormalizer.cs
new file mode 100644
index 000000000..6de064e8e
--- /dev/null
+++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexNameNormalizer.cs
@@ -0,0 +1,7 @@
+namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
+{
+ public interface IPersistenceIndexNameNormalizer
+ {
+ string NormalizeIndex(string index);
+ }
+}
diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexConsts.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexConsts.cs
new file mode 100644
index 000000000..4dc0c3996
--- /dev/null
+++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexConsts.cs
@@ -0,0 +1,11 @@
+namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
+{
+ internal static class PersistenceIndexConsts
+ {
+ public const string WorkflowInstanceIndex = "instances";
+ public const string EventIndex = "events";
+ public const string EventSubscriptionIndex = "subscriptions";
+ public const string ExecutionErrorIndex = "executionerrors";
+ public const string ScheduledCommandIndex = "scheduledcommands";
+ }
+}
diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexInitializer.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexInitializer.cs
new file mode 100644
index 000000000..63bcf5b15
--- /dev/null
+++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexInitializer.cs
@@ -0,0 +1,163 @@
+using LINGYUN.Abp.Elasticsearch;
+using LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.Models;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Nest;
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Volo.Abp;
+using Volo.Abp.DependencyInjection;
+using Volo.Abp.Json;
+using WorkflowCore.Models;
+
+namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
+{
+ public class PersistenceIndexInitializer : IPersistenceIndexInitializer, ISingletonDependency
+ {
+ private readonly ILogger _logger;
+ private readonly AbpJsonOptions _jsonOptions;
+ private readonly AbpWorkflowCorePersistenceElasticsearchOptions _elasticsearchOptions;
+ private readonly IPersistenceIndexNameNormalizer _nameNormalizer;
+ private readonly IElasticsearchClientFactory _clientFactory;
+
+ public PersistenceIndexInitializer(
+ IOptions jsonOptions,
+ IOptions elasticsearchOptions,
+ IPersistenceIndexNameNormalizer nameNormalizer,
+ IElasticsearchClientFactory clientFactory,
+ ILogger logger)
+ {
+ _jsonOptions = jsonOptions.Value;
+ _elasticsearchOptions = elasticsearchOptions.Value;
+ _nameNormalizer = nameNormalizer;
+ _clientFactory = clientFactory;
+ _logger = logger;
+ }
+
+ public virtual async Task InitializeAsync()
+ {
+ var client = _clientFactory.Create();
+ var dateTimeFormat = !_jsonOptions.DefaultDateTimeFormat.IsNullOrWhiteSpace()
+ ? $"{_jsonOptions.DefaultDateTimeFormat}||strict_date_optional_time||epoch_millis"
+ : "strict_date_optional_time||epoch_millis";
+ var indexState = new IndexState
+ {
+ Settings = _elasticsearchOptions.IndexSettings,
+ };
+
+ await InitlizeWorkflowInstanceIndex(client, indexState, dateTimeFormat);
+ await InitlizeEventIndex(client, indexState, dateTimeFormat);
+ await InitlizeEventSubscriptionIndex(client, indexState, dateTimeFormat);
+ await InitlizeExecutionErrorIndex(client, indexState, dateTimeFormat);
+ await InitlizeScheduledCommandIndex(client, indexState, dateTimeFormat);
+ }
+
+ protected virtual async Task InitlizeWorkflowInstanceIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat)
+ {
+ var indexName = _nameNormalizer.NormalizeIndex("instances");
+ var indexExists = await client.Indices.ExistsAsync(indexName);
+ if (!indexExists.Exists)
+ {
+ var indexCreateResponse = await client.Indices.CreateAsync(
+ indexName,
+ dsl => dsl.InitializeUsing(indexState)
+ .Map(map => map.AutoMap()
+ .Properties(mp =>
+ mp.Date(p => p.Name(n => n.CreateTime).Format(dateTimeFormat))
+ .Date(p => p.Name(n => n.CompleteTime).Format(dateTimeFormat))
+ .Nested(p => p.Name(n => n.ExecutionPointers)
+ .AutoMap()
+ .Properties(np =>
+ np.Date(p => p.Name(n => n.EndTime).Format(dateTimeFormat))
+ .Date(p => p.Name(n => n.StartTime).Format(dateTimeFormat))
+ .Date(p => p.Name(n => n.SleepUntil).Format(dateTimeFormat))
+ .Object>(p => p.Name(n => n.ExtensionAttributes)))))));
+
+ CheckResponse(indexCreateResponse);
+ }
+ }
+
+ protected virtual async Task InitlizeEventIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat)
+ {
+ var indexName = _nameNormalizer.NormalizeIndex("events");
+ var indexExists = await client.Indices.ExistsAsync(indexName);
+ if (!indexExists.Exists)
+ {
+ var indexCreateResponse = await client.Indices.CreateAsync(
+ indexName,
+ dsl => dsl.InitializeUsing(indexState)
+ .Map(map => map.AutoMap()
+ .Properties(mp =>
+ mp.Date(p => p.Name(n => n.EventTime).Format(dateTimeFormat)))));
+
+ CheckResponse(indexCreateResponse);
+ }
+ }
+
+ protected virtual async Task InitlizeEventSubscriptionIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat)
+ {
+ var indexName = _nameNormalizer.NormalizeIndex("subscriptions");
+ var indexExists = await client.Indices.ExistsAsync(indexName);
+ if (!indexExists.Exists)
+ {
+ var indexCreateResponse = await client.Indices.CreateAsync(
+ indexName,
+ dsl => dsl.InitializeUsing(indexState)
+ .Map(map => map.AutoMap()
+ .Properties(mp =>
+ mp.Date(p => p.Name(n => n.SubscribeAsOf).Format(dateTimeFormat))
+ .Date(p => p.Name(n => n.ExternalTokenExpiry).Format(dateTimeFormat)))));
+
+ CheckResponse(indexCreateResponse);
+ }
+ }
+
+ protected virtual async Task InitlizeExecutionErrorIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat)
+ {
+ var indexName = _nameNormalizer.NormalizeIndex("executionerrors");
+ var indexExists = await client.Indices.ExistsAsync(indexName);
+ if (!indexExists.Exists)
+ {
+ var indexCreateResponse = await client.Indices.CreateAsync(
+ indexName,
+ dsl => dsl.InitializeUsing(indexState)
+ .Map(map => map.AutoMap()
+ .Properties(mp =>
+ mp.Date(p => p.Name(n => n.ErrorTime).Format(dateTimeFormat)))));
+
+ CheckResponse(indexCreateResponse);
+ }
+ }
+
+ protected virtual async Task InitlizeScheduledCommandIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat)
+ {
+ var indexName = _nameNormalizer.NormalizeIndex("scheduledcommands");
+ var indexExists = await client.Indices.ExistsAsync(indexName);
+ if (!indexExists.Exists)
+ {
+ var indexCreateResponse = await client.Indices.CreateAsync(
+ indexName,
+ dsl => dsl.InitializeUsing(indexState)
+ .Map(map => map.AutoMap()));
+
+ CheckResponse(indexCreateResponse);
+ }
+ }
+
+ private void CheckResponse(IResponse response)
+ {
+ if (!response.ApiCall.Success)
+ {
+ _logger.LogError(default(EventId), response.ApiCall.OriginalException, $"ES Persistence index initlize failed");
+ throw new AbpException($"ES Operation Failed", response.ApiCall.OriginalException);
+ }
+
+ if (!response.IsValid)
+ {
+ _logger.LogWarning("ES Persistence index initlize valid error: {0}", response.DebugInformation);
+ throw new InvalidOperationException(response.DebugInformation, response.OriginalException);
+ }
+ }
+ }
+}
diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexNameNormalizer.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexNameNormalizer.cs
new file mode 100644
index 000000000..1d51f2558
--- /dev/null
+++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexNameNormalizer.cs
@@ -0,0 +1,21 @@
+using Microsoft.Extensions.Options;
+using Volo.Abp.DependencyInjection;
+
+namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch
+{
+ public class PersistenceIndexNameNormalizer : IPersistenceIndexNameNormalizer, ISingletonDependency
+ {
+ private readonly AbpWorkflowCorePersistenceElasticsearchOptions _options;
+
+ public PersistenceIndexNameNormalizer(
+ IOptions options)
+ {
+ _options = options.Value;
+ }
+
+ public string NormalizeIndex(string index)
+ {
+ return string.Format(_options.IndexFormat, index);
+ }
+ }
+}