From 6499dfd33cba9990f08730898f65298e4ffe13c3 Mon Sep 17 00:00:00 2001 From: colin Date: Mon, 10 Nov 2025 14:55:52 +0800 Subject: [PATCH] fix(cap): Fix the issue of the startup sequence of the CAP component --- .../LINGYUN.Abp.EventBus.CAP.csproj | 5 +- .../Abp/EventBus/CAP/AbpCAPBootstrapper.cs | 156 +++++++ .../Abp/EventBus/CAP/AbpCAPEventBusModule.cs | 25 ++ .../ServiceCollectionExtensions.cs | 29 ++ .../LINGYUN.Abp.EventBus.CAP.xml | 397 ------------------ 5 files changed, 212 insertions(+), 400 deletions(-) create mode 100644 aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPBootstrapper.cs delete mode 100644 aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.csproj b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.csproj index 3a4a9a829..2f50762b8 100644 --- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.csproj +++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.csproj @@ -10,10 +10,9 @@ false false false - - Cap分布式事件总线 true - $(SolutionDir)framework\common\LINGYUN.Abp.EventBus.CAP\LINGYUN.Abp.EventBus.CAP.xml + Cap分布式事件总线 + diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPBootstrapper.cs b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPBootstrapper.cs new file mode 100644 index 000000000..55d753571 --- /dev/null +++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPBootstrapper.cs @@ -0,0 +1,156 @@ +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Persistence; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace LINGYUN.Abp.EventBus.CAP; + +public class AbpCAPBootstrapper : IBootstrapper +{ + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + + private CancellationTokenSource _cts; + private bool _disposed; + private IEnumerable _processors = default!; + + public bool IsStarted => !_cts?.IsCancellationRequested ?? false; + + public AbpCAPBootstrapper(IServiceProvider serviceProvider, ILogger logger) + { + _serviceProvider = serviceProvider; + _logger = logger; + } + + public async Task BootstrapAsync(CancellationToken cancellationToken = default) + { + if (_cts != null) + { + _logger.LogInformation("### CAP background task is already started!"); + + return; + } + + _logger.LogDebug("### CAP background task is starting."); + + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + CheckRequirement(); + + _processors = _serviceProvider.GetServices(); + + try + { + await _serviceProvider.GetRequiredService().InitializeAsync(_cts.Token).ConfigureAwait(false); + } + catch (Exception e) + { + if (e is InvalidOperationException) throw; + _logger.LogError(e, "Initializing the storage structure failed!"); + } + + _cts.Token.Register(() => + { + _logger.LogDebug("### CAP background task is stopping."); + + + foreach (var item in _processors) + try + { + item.Dispose(); + } + catch (OperationCanceledException ex) + { + _logger.LogWarning(ex, $"Expected an OperationCanceledException, but found '{ex.Message}'."); + } + }); + + await BootstrapCoreAsync().ConfigureAwait(false); + + _disposed = false; + _logger.LogInformation("### CAP started!"); + } + + protected virtual async Task BootstrapCoreAsync() + { + foreach (var item in _processors) + { + try + { + _cts!.Token.ThrowIfCancellationRequested(); + + await item.Start(_cts!.Token); + } + catch (OperationCanceledException) + { + // ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "Starting the processors throw an exception."); + } + } + } + + public virtual void Dispose() + { + if (_disposed) return; + + _cts?.Cancel(); + _cts?.Dispose(); + _cts = null; + _disposed = true; + } + + public virtual async Task ExecuteAsync(CancellationToken stoppingToken) + { + await BootstrapAsync(stoppingToken).ConfigureAwait(false); + } + + public virtual Task StopAsync(CancellationToken cancellationToken) + { + _cts?.Cancel(); + + return Task.CompletedTask; + } + + private void CheckRequirement() + { + var marker = _serviceProvider.GetService(); + if (marker == null) + throw new InvalidOperationException( + "AddCap() must be added on the service collection. eg: services.AddCap(...)"); + + var messageQueueMarker = _serviceProvider.GetService(); + if (messageQueueMarker == null) + throw new InvalidOperationException( + "You must be config transport provider for CAP!" + Environment.NewLine + + "==================================================================================" + + Environment.NewLine + + "======== eg: services.AddCap( options => { options.UseRabbitMQ(...) }); ========" + + Environment.NewLine + + "=================================================================================="); + + var databaseMarker = _serviceProvider.GetService(); + if (databaseMarker == null) + throw new InvalidOperationException( + "You must be config storage provider for CAP!" + Environment.NewLine + + "===================================================================================" + + Environment.NewLine + + "======== eg: services.AddCap( options => { options.UseSqlServer(...) }); ========" + + Environment.NewLine + + "==================================================================================="); + } + + public ValueTask DisposeAsync() + { + Dispose(); + + return ValueTask.CompletedTask; + } +} diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusModule.cs b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusModule.cs index 39310dfd5..25b7fa275 100644 --- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusModule.cs +++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusModule.cs @@ -1,8 +1,12 @@ using DotNetCore.CAP; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using System.Threading; +using System.Threading.Tasks; +using Volo.Abp; using Volo.Abp.EventBus; using Volo.Abp.Modularity; +using Volo.Abp.Threading; namespace LINGYUN.Abp.EventBus.CAP; @@ -12,6 +16,7 @@ namespace LINGYUN.Abp.EventBus.CAP; [DependsOn(typeof(AbpEventBusModule))] public class AbpCAPEventBusModule : AbpModule { + private readonly CancellationTokenSource _cancellationTokenSource = new(); /// /// ConfigureServices /// @@ -48,4 +53,24 @@ public class AbpCAPEventBusModule : AbpModule } }); } + + public override void OnPreApplicationInitialization(ApplicationInitializationContext context) + { + AsyncHelper.RunSync(() => OnPreApplicationInitializationAsync(context)); + } + + public async override Task OnPreApplicationInitializationAsync(ApplicationInitializationContext context) + { + await context + .ServiceProvider + .GetRequiredService() + .BootstrapAsync(_cancellationTokenSource.Token); + } + + public override Task OnApplicationShutdownAsync(ApplicationShutdownContext context) + { + _cancellationTokenSource.Cancel(); + + return Task.CompletedTask; + } } diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/Microsoft/Extensions/DependencyInjection/ServiceCollectionExtensions.cs b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/Microsoft/Extensions/DependencyInjection/ServiceCollectionExtensions.cs index 445567aff..81ae8168e 100644 --- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/Microsoft/Extensions/DependencyInjection/ServiceCollectionExtensions.cs +++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/Microsoft/Extensions/DependencyInjection/ServiceCollectionExtensions.cs @@ -2,7 +2,9 @@ using DotNetCore.CAP.Internal; using DotNetCore.CAP.Serialization; using LINGYUN.Abp.EventBus.CAP; +using Microsoft.Extensions.Hosting; using System; +using System.Collections.Generic; namespace Microsoft.Extensions.DependencyInjection; @@ -23,6 +25,33 @@ public static class ServiceCollectionExtensions // 替换为自己的实现 services.AddSingleton(); services.AddSingleton(); + + // 移除默认CAP启动接口 + services.RemoveAll(service => + { + if (service.ServiceType.IsAssignableFrom(typeof(IBootstrapper))) + { + return true; + } + // 默认Bootstrapper + if (service.ImplementationType != null && + service.ImplementationType.IsAssignableTo(typeof(IBootstrapper))) + { + return true; + } + // 默认Bootstrapper HostService + if (service.ImplementationFactory != null && + service.ImplementationFactory.Method.ReturnType.IsAssignableTo(typeof(IBootstrapper))) + { + return true; + } + + return false; + }); + // 使用重写的接口,不使用BackgroundService + services.AddSingleton(); + services.AddSingleton(sp => sp.GetRequiredService()); + return services; } } diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml deleted file mode 100644 index 35b6776bb..000000000 --- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml +++ /dev/null @@ -1,397 +0,0 @@ - - - - LINGYUN.Abp.EventBus.CAP - - - - - 消费者查找器 - - - - - CAP配置 - - - - - Abp分布式事件配置 - - - - - 服务提供者 - - - - - Creates a new . - - - - - 查找消费者集合 - - - - - - - 获取事件处理器集合 - - - - - - - - AbpCAPEventBusModule - - - - - ConfigureServices - - - - - - 过期消息清理配置项 - - - - - 发布消息处理失败通知 - default: false - - - - - AbpECAPExecutionFailedException - - - - - MessageType - - - - - Message - - - - - constructor - - - - - - - constructor - - - - - - - - constructor - - - - - - - - - CAP消息扩展 - - - - - 尝试获取消息标头中的租户标识 - - - - - - - - 获取消息标头中的租户标识 - - - - - - - 尝试获取消息标头中的链路标识 - - - - - - - - 获取消息标头中的链路标识 - - - - - - - 重写 ISubscribeInvoker 实现 Abp 租户集成 - - - - - AbpCAPSubscribeInvoker - - - - - - - - - - 调用订阅者方法 - - - - - - - - - - - - - - - - - - 获取事件处理类实例 - - - - - - - - 通过给定的类型实例与参数调用订阅者方法 - - - - - - - - - CAP分布式事件总线 - - - - - CAP消息发布接口 - - - - - 自定义事件注册接口 - - - - - 本地事件处理器工厂对象集合 - - - - - 本地事件集合 - - - - - 当前用户 - - - - - 当前客户端 - - - - - typeof - - - - - 取消令牌 - - - - - constructor - - - - - - - - - - - - - - - - - - - - 订阅事件 - - - - - - - - 退订事件 - - 事件类型 - - - - - 退订事件 - - 事件类型 - 事件处理器 - - - - 退订事件 - - 事件类型 - 事件处理器工厂 - - - - 退订所有事件 - - 事件类型 - - - - 发布事件 - - 事件类型 - 事件数据对象 - - - - - 获取事件处理器工厂列表 - - - - - - - 自定义事件订阅者 - - - - - 订阅事件 - - - - - - - 取消订阅 - - - - - - - Executes the configured method on . This can be used whether or not - the configured method is asynchronous. - - - Even if the target method is asynchronous, it's desirable to invoke it using Execute rather than - ExecuteAsync if you know at compile time what the return type is, because then you can directly - "await" that value (via a cast), and then the generated code will be able to reference the - resulting awaitable as a value-typed variable. If you use ExecuteAsync instead, the generated - code will have to treat the resulting awaitable as a boxed object, because it doesn't know at - compile time what type it would be. - - The object whose method is to be executed. - Parameters to pass to the method. - The method return value. - - - - Executes the configured method on . This can only be used if the configured - method is asynchronous. - - - If you don't know at compile time the type of the method's returned awaitable, you can use ExecuteAsync, - which supplies an awaitable-of-object. This always works, but can incur several extra heap allocations - as compared with using Execute and then using "await" on the result value typecasted to the known - awaitable type. The possible extra heap allocations are for: - 1. The custom awaitable (though usually there's a heap allocation for this anyway, since normally - it's a reference type, and you normally create a new instance per call). - 2. The custom awaiter (whether or not it's a value type, since if it's not, you need a new instance - of it, and if it is, it will have to be boxed so the calling code can reference it as an object). - 3. The async result value, if it's a value type (it has to be boxed as an object, since the calling - code doesn't know what type it's going to be). - - The object whose method is to be executed. - Parameters to pass to the method. - An object that you can "await" to get the method return value. - - - - Provides a common awaitable structure that can - return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an - application-defined custom awaitable. - - - - - Helper for detecting whether a given type is FSharpAsync`1, and if so, supplying - an for mapping instances of that type to a C# awaitable. - - - The main design goal here is to avoid taking a compile-time dependency on - FSharp.Core.dll, because non-F# applications wouldn't use it. So all the references - to FSharp types have to be constructed dynamically at runtime. - - - - - CAP ServiceCollectionExtensions - - - - - Adds and configures the consistence services for the consistency. - - - - - - -