diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.csproj b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.csproj index f4dd0bf87..9b032dfb6 100644 --- a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.csproj +++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.csproj @@ -10,6 +10,7 @@ + diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchModule.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchModule.cs index 032403983..c2c44cee7 100644 --- a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchModule.cs +++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchModule.cs @@ -1,5 +1,6 @@ using LINGYUN.Abp.Elasticsearch; using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.Json; using Volo.Abp.Modularity; using WorkflowCore.Interface; using WorkflowCore.Models; @@ -7,6 +8,7 @@ using WorkflowCore.Models; namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch { [DependsOn(typeof(AbpWorkflowCoreModule))] + [DependsOn(typeof(AbpJsonModule))] [DependsOn(typeof(AbpElasticsearchModule))] public class AbpWorkflowCorePersistenceElasticsearchModule : AbpModule { diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchOptions.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchOptions.cs index 4bf9dc672..43f522a3e 100644 --- a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchOptions.cs +++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/AbpWorkflowCorePersistenceElasticsearchOptions.cs @@ -1,14 +1,18 @@ -namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch +using Nest; + +namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch { public class AbpWorkflowCorePersistenceElasticsearchOptions { /// - /// Default Value: abp.workflows.persistence + /// Default Value: abp.workflows.persistence.{0} /// public string IndexFormat { get; set; } + public IIndexSettings IndexSettings { get; set; } public AbpWorkflowCorePersistenceElasticsearchOptions() { - IndexFormat = "abp.workflows.persistence"; + IndexFormat = "abp.workflows.persistence.{0}"; + IndexSettings = new IndexSettings(); } } } diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/ElasticsearchPersistenceProvider.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/ElasticsearchPersistenceProvider.cs index 93ca53e38..e1800bd1a 100644 --- a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/ElasticsearchPersistenceProvider.cs +++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/ElasticsearchPersistenceProvider.cs @@ -1,7 +1,6 @@ using LINGYUN.Abp.Elasticsearch; using LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.Models; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; using Nest; using System; using System.Collections.Generic; @@ -11,6 +10,7 @@ using System.Threading.Tasks; using Volo.Abp; using Volo.Abp.DependencyInjection; using Volo.Abp.Guids; +using Volo.Abp.Threading; using WorkflowCore.Interface; using WorkflowCore.Models; @@ -21,18 +21,21 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch private readonly ILogger _logger; private readonly IGuidGenerator _guidGenerator; + private readonly IPersistenceIndexNameNormalizer _indexNameNormalizer; + private readonly IPersistenceIndexInitializer _indexInitializer; private readonly IElasticsearchClientFactory _elasticsearchClientFactory; - private readonly AbpWorkflowCorePersistenceElasticsearchOptions _options; public ElasticsearchPersistenceProvider( IGuidGenerator guidGenerator, IElasticsearchClientFactory elasticsearchClientFactory, - IOptions options, + IPersistenceIndexInitializer indexInitializer, + IPersistenceIndexNameNormalizer indexNameNormalizer, ILogger logger) { _guidGenerator = guidGenerator; _elasticsearchClientFactory = elasticsearchClientFactory; - _options = options.Value; + _indexInitializer = indexInitializer; + _indexNameNormalizer = indexNameNormalizer; _logger = logger; } @@ -46,7 +49,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var response = await client.GetAsync( id, - dsl => dsl.Index(CreateIndex()), + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)), ct: cancellationToken); CheckResponse(response); @@ -63,7 +66,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch await client.UpdateAsync( id, - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)) .Doc(response.Source), ct: cancellationToken); } @@ -79,7 +82,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var response = await client.IndexAsync( newEvent, - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex)) .Id(newEventId), ct: cancellationToken); @@ -98,7 +101,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var response = await client.IndexAsync( subscription, - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)) .Id(newSubscriptionId), ct: cancellationToken); @@ -117,7 +120,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var response = await client.IndexAsync( workflow, - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex)) .Id(newWorkflowId), ct: cancellationToken); @@ -128,14 +131,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch public void EnsureStoreExists() { - // TODO: 为什么是同步API... - var client = CreateClient(); - - var response = client.Indices.Exists(CreateIndex()); - if (!response.Exists) - { - client.Indices.Create(CreateIndex()); - } + AsyncHelper.RunSync(async () => await _indexInitializer.InitializeAsync()); } public virtual async Task GetEvent(string id, CancellationToken cancellationToken = default) @@ -146,7 +142,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var response = await client.GetAsync( eventId, - dsl => dsl.Index(CreateIndex()), + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex)), ct: cancellationToken); CheckResponse(response); @@ -165,7 +161,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch terms.Add(x => x.DateRange(t => t.Field(f => f.EventTime).GreaterThanOrEquals(asOf))); var response = await client.SearchAsync( - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex)) .Query(q => q.Bool(b => b.Filter(terms))) .Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))), ct: cancellationToken); @@ -187,7 +183,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch terms.Add(x => x.DateRange(t => t.Field(f => f.SubscribeAsOf).LessThanOrEquals(asOf))); var response = await client.SearchAsync( - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)) .Query(q => q.Bool(b => b.Filter(terms))) .Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))) .Sort(s => s.Field(f => f.SubscribeAsOf, SortOrder.Ascending)) @@ -210,7 +206,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch terms.Add(x => x.DateRange(t => t.Field(f => f.EventTime).LessThanOrEquals(now))); var response = await client.SearchAsync( - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventIndex)) .Query(q => q.Bool(b => b.Filter(terms))) .Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))), ct: cancellationToken); @@ -231,7 +227,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch terms.Add(x => x.Term(t => t.Field(f => f.Status).Value(WorkflowStatus.Runnable))); var response = await client.SearchAsync( - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex)) .Query(q => q.Bool(b => b.Filter(terms))) .Source(s => s.Includes(e => e.Field(f => f.Id.Suffix("keyword")))), ct: cancellationToken); @@ -249,7 +245,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var response = await client.GetAsync( id, - dsl => dsl.Index(CreateIndex()), + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)), ct: cancellationToken); CheckResponse(response); @@ -269,7 +265,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch terms.Add(x => x.DateRange(t => t.Field(f => f.SubscribeAsOf).LessThanOrEquals(now))); var response = await client.SearchAsync( - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)) .Query(q => q.Bool(b => b.Filter(terms))) .Source(s => s.IncludeAll()), ct: cancellationToken); @@ -286,7 +282,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var response = await client.GetAsync( workflowId, - dsl => dsl.Index(CreateIndex()), + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex)), ct: cancellationToken); CheckResponse(response); @@ -318,7 +314,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch } var response = await client.SearchAsync( - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex)) .Query(q => q.Bool(b => b.Filter(terms))) .Source(s => s.IncludeAll()) .Skip(skip) @@ -334,7 +330,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var client = CreateClient(); var response = await client.SearchAsync( - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex)) .Query(q => q.Bool(b => b.Should(s => @@ -350,12 +346,13 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch public virtual async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default) { var eventId = Guid.Parse(id); + var indexName = CreateIndex(PersistenceIndexConsts.EventIndex); var client = CreateClient(); var response = await client.GetAsync( eventId, - dsl => dsl.Index(CreateIndex()), + dsl => dsl.Index(indexName), ct: cancellationToken); CheckResponse(response); @@ -366,7 +363,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch await client.UpdateAsync( id, - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(indexName) .Doc(response.Source), ct: cancellationToken); } @@ -375,12 +372,13 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch public virtual async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default) { var eventId = Guid.Parse(id); + var indexName = CreateIndex(PersistenceIndexConsts.EventIndex); var client = CreateClient(); var response = await client.GetAsync( eventId, - dsl => dsl.Index(CreateIndex()), + dsl => dsl.Index(indexName), ct: cancellationToken); CheckResponse(response); @@ -391,7 +389,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch await client.UpdateAsync( id, - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(indexName) .Doc(response.Source), ct: cancellationToken); } @@ -406,7 +404,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var response = await client.IndexManyAsync( errors, - CreateIndex(), + CreateIndex(PersistenceIndexConsts.ExecutionErrorIndex), cancellationToken: cancellationToken); CheckResponse(response); @@ -416,19 +414,20 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch public virtual async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) { var workflowId = Guid.Parse(workflow.Id); + var indexName = CreateIndex(PersistenceIndexConsts.WorkflowInstanceIndex); var client = CreateClient(); var response = await client.GetAsync( workflowId, - dsl => dsl.Index(CreateIndex()), + dsl => dsl.Index(indexName), ct: cancellationToken); CheckResponse(response); await client.UpdateAsync( workflowId, - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(indexName) .Doc(workflow), ct: cancellationToken); } @@ -436,13 +435,14 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch public virtual async Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) { var client = CreateClient(); + var indexName = CreateIndex(PersistenceIndexConsts.ScheduledCommandIndex); var terms = new List, QueryContainer>>(); terms.Add(x => x.LongRange(t => t.Field(f => f.ExecuteTime).LessThan(asOf.Ticks))); var response = await client.SearchAsync( - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(indexName) .Query(q => q.Bool(b => b.Filter(terms))) .Source(s => s.IncludeAll()), ct: cancellationToken); @@ -457,7 +457,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch await client.DeleteAsync( command.Id, - dsl => dsl.Index(CreateIndex()), + dsl => dsl.Index(indexName), ct: cancellationToken); } catch (Exception) @@ -477,7 +477,9 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var response = await client.IndexAsync( persistedCommand, - dsl => dsl.Index(CreateIndex()).Id(persistedCommand.Id)); + dsl => dsl + .Index(CreateIndex(PersistenceIndexConsts.ScheduledCommandIndex)) + .Id(persistedCommand.Id)); CheckResponse(response); } @@ -485,12 +487,13 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch public virtual async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default) { var id = Guid.Parse(eventSubscriptionId); + var indexName = CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex); var client = CreateClient(); var response = await client.GetAsync( id, - dsl => dsl.Index(CreateIndex()), + dsl => dsl.Index(indexName), ct: cancellationToken); CheckResponse(response); @@ -503,7 +506,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var uptResponse = await client.UpdateAsync( id, - dsl => dsl.Index(CreateIndex()) + dsl => dsl.Index(indexName) .Doc(response.Source), ct: cancellationToken); @@ -521,7 +524,7 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch var response = await client.DeleteAsync( id, - dsl => dsl.Index(CreateIndex()), + dsl => dsl.Index(CreateIndex(PersistenceIndexConsts.EventSubscriptionIndex)), ct: cancellationToken); CheckResponse(response); @@ -532,9 +535,9 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch return _elasticsearchClientFactory.Create(); } - private string CreateIndex() + private string CreateIndex(string index) { - return _options.IndexFormat; + return _indexNameNormalizer.NormalizeIndex(index); } private void CheckResponse(IResponse response) diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexInitializer.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexInitializer.cs new file mode 100644 index 000000000..4bb9d8632 --- /dev/null +++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexInitializer.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch +{ + public interface IPersistenceIndexInitializer + { + Task InitializeAsync(); + } +} diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexNameNormalizer.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexNameNormalizer.cs new file mode 100644 index 000000000..6de064e8e --- /dev/null +++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/IPersistenceIndexNameNormalizer.cs @@ -0,0 +1,7 @@ +namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch +{ + public interface IPersistenceIndexNameNormalizer + { + string NormalizeIndex(string index); + } +} diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexConsts.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexConsts.cs new file mode 100644 index 000000000..4dc0c3996 --- /dev/null +++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexConsts.cs @@ -0,0 +1,11 @@ +namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch +{ + internal static class PersistenceIndexConsts + { + public const string WorkflowInstanceIndex = "instances"; + public const string EventIndex = "events"; + public const string EventSubscriptionIndex = "subscriptions"; + public const string ExecutionErrorIndex = "executionerrors"; + public const string ScheduledCommandIndex = "scheduledcommands"; + } +} diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexInitializer.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexInitializer.cs new file mode 100644 index 000000000..63bcf5b15 --- /dev/null +++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexInitializer.cs @@ -0,0 +1,163 @@ +using LINGYUN.Abp.Elasticsearch; +using LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch.Models; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Nest; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Volo.Abp; +using Volo.Abp.DependencyInjection; +using Volo.Abp.Json; +using WorkflowCore.Models; + +namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch +{ + public class PersistenceIndexInitializer : IPersistenceIndexInitializer, ISingletonDependency + { + private readonly ILogger _logger; + private readonly AbpJsonOptions _jsonOptions; + private readonly AbpWorkflowCorePersistenceElasticsearchOptions _elasticsearchOptions; + private readonly IPersistenceIndexNameNormalizer _nameNormalizer; + private readonly IElasticsearchClientFactory _clientFactory; + + public PersistenceIndexInitializer( + IOptions jsonOptions, + IOptions elasticsearchOptions, + IPersistenceIndexNameNormalizer nameNormalizer, + IElasticsearchClientFactory clientFactory, + ILogger logger) + { + _jsonOptions = jsonOptions.Value; + _elasticsearchOptions = elasticsearchOptions.Value; + _nameNormalizer = nameNormalizer; + _clientFactory = clientFactory; + _logger = logger; + } + + public virtual async Task InitializeAsync() + { + var client = _clientFactory.Create(); + var dateTimeFormat = !_jsonOptions.DefaultDateTimeFormat.IsNullOrWhiteSpace() + ? $"{_jsonOptions.DefaultDateTimeFormat}||strict_date_optional_time||epoch_millis" + : "strict_date_optional_time||epoch_millis"; + var indexState = new IndexState + { + Settings = _elasticsearchOptions.IndexSettings, + }; + + await InitlizeWorkflowInstanceIndex(client, indexState, dateTimeFormat); + await InitlizeEventIndex(client, indexState, dateTimeFormat); + await InitlizeEventSubscriptionIndex(client, indexState, dateTimeFormat); + await InitlizeExecutionErrorIndex(client, indexState, dateTimeFormat); + await InitlizeScheduledCommandIndex(client, indexState, dateTimeFormat); + } + + protected virtual async Task InitlizeWorkflowInstanceIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat) + { + var indexName = _nameNormalizer.NormalizeIndex("instances"); + var indexExists = await client.Indices.ExistsAsync(indexName); + if (!indexExists.Exists) + { + var indexCreateResponse = await client.Indices.CreateAsync( + indexName, + dsl => dsl.InitializeUsing(indexState) + .Map(map => map.AutoMap() + .Properties(mp => + mp.Date(p => p.Name(n => n.CreateTime).Format(dateTimeFormat)) + .Date(p => p.Name(n => n.CompleteTime).Format(dateTimeFormat)) + .Nested(p => p.Name(n => n.ExecutionPointers) + .AutoMap() + .Properties(np => + np.Date(p => p.Name(n => n.EndTime).Format(dateTimeFormat)) + .Date(p => p.Name(n => n.StartTime).Format(dateTimeFormat)) + .Date(p => p.Name(n => n.SleepUntil).Format(dateTimeFormat)) + .Object>(p => p.Name(n => n.ExtensionAttributes))))))); + + CheckResponse(indexCreateResponse); + } + } + + protected virtual async Task InitlizeEventIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat) + { + var indexName = _nameNormalizer.NormalizeIndex("events"); + var indexExists = await client.Indices.ExistsAsync(indexName); + if (!indexExists.Exists) + { + var indexCreateResponse = await client.Indices.CreateAsync( + indexName, + dsl => dsl.InitializeUsing(indexState) + .Map(map => map.AutoMap() + .Properties(mp => + mp.Date(p => p.Name(n => n.EventTime).Format(dateTimeFormat))))); + + CheckResponse(indexCreateResponse); + } + } + + protected virtual async Task InitlizeEventSubscriptionIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat) + { + var indexName = _nameNormalizer.NormalizeIndex("subscriptions"); + var indexExists = await client.Indices.ExistsAsync(indexName); + if (!indexExists.Exists) + { + var indexCreateResponse = await client.Indices.CreateAsync( + indexName, + dsl => dsl.InitializeUsing(indexState) + .Map(map => map.AutoMap() + .Properties(mp => + mp.Date(p => p.Name(n => n.SubscribeAsOf).Format(dateTimeFormat)) + .Date(p => p.Name(n => n.ExternalTokenExpiry).Format(dateTimeFormat))))); + + CheckResponse(indexCreateResponse); + } + } + + protected virtual async Task InitlizeExecutionErrorIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat) + { + var indexName = _nameNormalizer.NormalizeIndex("executionerrors"); + var indexExists = await client.Indices.ExistsAsync(indexName); + if (!indexExists.Exists) + { + var indexCreateResponse = await client.Indices.CreateAsync( + indexName, + dsl => dsl.InitializeUsing(indexState) + .Map(map => map.AutoMap() + .Properties(mp => + mp.Date(p => p.Name(n => n.ErrorTime).Format(dateTimeFormat))))); + + CheckResponse(indexCreateResponse); + } + } + + protected virtual async Task InitlizeScheduledCommandIndex(IElasticClient client, IIndexState indexState, string dateTimeFormat) + { + var indexName = _nameNormalizer.NormalizeIndex("scheduledcommands"); + var indexExists = await client.Indices.ExistsAsync(indexName); + if (!indexExists.Exists) + { + var indexCreateResponse = await client.Indices.CreateAsync( + indexName, + dsl => dsl.InitializeUsing(indexState) + .Map(map => map.AutoMap())); + + CheckResponse(indexCreateResponse); + } + } + + private void CheckResponse(IResponse response) + { + if (!response.ApiCall.Success) + { + _logger.LogError(default(EventId), response.ApiCall.OriginalException, $"ES Persistence index initlize failed"); + throw new AbpException($"ES Operation Failed", response.ApiCall.OriginalException); + } + + if (!response.IsValid) + { + _logger.LogWarning("ES Persistence index initlize valid error: {0}", response.DebugInformation); + throw new InvalidOperationException(response.DebugInformation, response.OriginalException); + } + } + } +} diff --git a/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexNameNormalizer.cs b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexNameNormalizer.cs new file mode 100644 index 000000000..1d51f2558 --- /dev/null +++ b/aspnet-core/modules/workflow/LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch/LINGYUN/Abp/WorkflowCore/Persistence/Elasticsearch/PersistenceIndexNameNormalizer.cs @@ -0,0 +1,21 @@ +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; + +namespace LINGYUN.Abp.WorkflowCore.Persistence.Elasticsearch +{ + public class PersistenceIndexNameNormalizer : IPersistenceIndexNameNormalizer, ISingletonDependency + { + private readonly AbpWorkflowCorePersistenceElasticsearchOptions _options; + + public PersistenceIndexNameNormalizer( + IOptions options) + { + _options = options.Value; + } + + public string NormalizeIndex(string index) + { + return string.Format(_options.IndexFormat, index); + } + } +}