diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.csproj b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.csproj
index 3a4a9a829..2f50762b8 100644
--- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.csproj
+++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.csproj
@@ -10,10 +10,9 @@
false
false
false
-
- Cap分布式事件总线
true
- $(SolutionDir)framework\common\LINGYUN.Abp.EventBus.CAP\LINGYUN.Abp.EventBus.CAP.xml
+ Cap分布式事件总线
+
diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPBootstrapper.cs b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPBootstrapper.cs
new file mode 100644
index 000000000..55d753571
--- /dev/null
+++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPBootstrapper.cs
@@ -0,0 +1,156 @@
+using DotNetCore.CAP;
+using DotNetCore.CAP.Internal;
+using DotNetCore.CAP.Persistence;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace LINGYUN.Abp.EventBus.CAP;
+
+public class AbpCAPBootstrapper : IBootstrapper
+{
+ private readonly ILogger _logger;
+ private readonly IServiceProvider _serviceProvider;
+
+ private CancellationTokenSource _cts;
+ private bool _disposed;
+ private IEnumerable _processors = default!;
+
+ public bool IsStarted => !_cts?.IsCancellationRequested ?? false;
+
+ public AbpCAPBootstrapper(IServiceProvider serviceProvider, ILogger logger)
+ {
+ _serviceProvider = serviceProvider;
+ _logger = logger;
+ }
+
+ public async Task BootstrapAsync(CancellationToken cancellationToken = default)
+ {
+ if (_cts != null)
+ {
+ _logger.LogInformation("### CAP background task is already started!");
+
+ return;
+ }
+
+ _logger.LogDebug("### CAP background task is starting.");
+
+ _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+
+ CheckRequirement();
+
+ _processors = _serviceProvider.GetServices();
+
+ try
+ {
+ await _serviceProvider.GetRequiredService().InitializeAsync(_cts.Token).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ if (e is InvalidOperationException) throw;
+ _logger.LogError(e, "Initializing the storage structure failed!");
+ }
+
+ _cts.Token.Register(() =>
+ {
+ _logger.LogDebug("### CAP background task is stopping.");
+
+
+ foreach (var item in _processors)
+ try
+ {
+ item.Dispose();
+ }
+ catch (OperationCanceledException ex)
+ {
+ _logger.LogWarning(ex, $"Expected an OperationCanceledException, but found '{ex.Message}'.");
+ }
+ });
+
+ await BootstrapCoreAsync().ConfigureAwait(false);
+
+ _disposed = false;
+ _logger.LogInformation("### CAP started!");
+ }
+
+ protected virtual async Task BootstrapCoreAsync()
+ {
+ foreach (var item in _processors)
+ {
+ try
+ {
+ _cts!.Token.ThrowIfCancellationRequested();
+
+ await item.Start(_cts!.Token);
+ }
+ catch (OperationCanceledException)
+ {
+ // ignore
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Starting the processors throw an exception.");
+ }
+ }
+ }
+
+ public virtual void Dispose()
+ {
+ if (_disposed) return;
+
+ _cts?.Cancel();
+ _cts?.Dispose();
+ _cts = null;
+ _disposed = true;
+ }
+
+ public virtual async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ await BootstrapAsync(stoppingToken).ConfigureAwait(false);
+ }
+
+ public virtual Task StopAsync(CancellationToken cancellationToken)
+ {
+ _cts?.Cancel();
+
+ return Task.CompletedTask;
+ }
+
+ private void CheckRequirement()
+ {
+ var marker = _serviceProvider.GetService();
+ if (marker == null)
+ throw new InvalidOperationException(
+ "AddCap() must be added on the service collection. eg: services.AddCap(...)");
+
+ var messageQueueMarker = _serviceProvider.GetService();
+ if (messageQueueMarker == null)
+ throw new InvalidOperationException(
+ "You must be config transport provider for CAP!" + Environment.NewLine +
+ "==================================================================================" +
+ Environment.NewLine +
+ "======== eg: services.AddCap( options => { options.UseRabbitMQ(...) }); ========" +
+ Environment.NewLine +
+ "==================================================================================");
+
+ var databaseMarker = _serviceProvider.GetService();
+ if (databaseMarker == null)
+ throw new InvalidOperationException(
+ "You must be config storage provider for CAP!" + Environment.NewLine +
+ "===================================================================================" +
+ Environment.NewLine +
+ "======== eg: services.AddCap( options => { options.UseSqlServer(...) }); ========" +
+ Environment.NewLine +
+ "===================================================================================");
+ }
+
+ public ValueTask DisposeAsync()
+ {
+ Dispose();
+
+ return ValueTask.CompletedTask;
+ }
+}
diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusModule.cs b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusModule.cs
index 39310dfd5..25b7fa275 100644
--- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusModule.cs
+++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusModule.cs
@@ -1,8 +1,12 @@
using DotNetCore.CAP;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
+using System.Threading;
+using System.Threading.Tasks;
+using Volo.Abp;
using Volo.Abp.EventBus;
using Volo.Abp.Modularity;
+using Volo.Abp.Threading;
namespace LINGYUN.Abp.EventBus.CAP;
@@ -12,6 +16,7 @@ namespace LINGYUN.Abp.EventBus.CAP;
[DependsOn(typeof(AbpEventBusModule))]
public class AbpCAPEventBusModule : AbpModule
{
+ private readonly CancellationTokenSource _cancellationTokenSource = new();
///
/// ConfigureServices
///
@@ -48,4 +53,24 @@ public class AbpCAPEventBusModule : AbpModule
}
});
}
+
+ public override void OnPreApplicationInitialization(ApplicationInitializationContext context)
+ {
+ AsyncHelper.RunSync(() => OnPreApplicationInitializationAsync(context));
+ }
+
+ public async override Task OnPreApplicationInitializationAsync(ApplicationInitializationContext context)
+ {
+ await context
+ .ServiceProvider
+ .GetRequiredService()
+ .BootstrapAsync(_cancellationTokenSource.Token);
+ }
+
+ public override Task OnApplicationShutdownAsync(ApplicationShutdownContext context)
+ {
+ _cancellationTokenSource.Cancel();
+
+ return Task.CompletedTask;
+ }
}
diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/Microsoft/Extensions/DependencyInjection/ServiceCollectionExtensions.cs b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/Microsoft/Extensions/DependencyInjection/ServiceCollectionExtensions.cs
index 445567aff..81ae8168e 100644
--- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/Microsoft/Extensions/DependencyInjection/ServiceCollectionExtensions.cs
+++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/Microsoft/Extensions/DependencyInjection/ServiceCollectionExtensions.cs
@@ -2,7 +2,9 @@
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Serialization;
using LINGYUN.Abp.EventBus.CAP;
+using Microsoft.Extensions.Hosting;
using System;
+using System.Collections.Generic;
namespace Microsoft.Extensions.DependencyInjection;
@@ -23,6 +25,33 @@ public static class ServiceCollectionExtensions
// 替换为自己的实现
services.AddSingleton();
services.AddSingleton();
+
+ // 移除默认CAP启动接口
+ services.RemoveAll(service =>
+ {
+ if (service.ServiceType.IsAssignableFrom(typeof(IBootstrapper)))
+ {
+ return true;
+ }
+ // 默认Bootstrapper
+ if (service.ImplementationType != null &&
+ service.ImplementationType.IsAssignableTo(typeof(IBootstrapper)))
+ {
+ return true;
+ }
+ // 默认Bootstrapper HostService
+ if (service.ImplementationFactory != null &&
+ service.ImplementationFactory.Method.ReturnType.IsAssignableTo(typeof(IBootstrapper)))
+ {
+ return true;
+ }
+
+ return false;
+ });
+ // 使用重写的接口,不使用BackgroundService
+ services.AddSingleton();
+ services.AddSingleton(sp => sp.GetRequiredService());
+
return services;
}
}
diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml
deleted file mode 100644
index 35b6776bb..000000000
--- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml
+++ /dev/null
@@ -1,397 +0,0 @@
-
-
-
- LINGYUN.Abp.EventBus.CAP
-
-
-
-
- 消费者查找器
-
-
-
-
- CAP配置
-
-
-
-
- Abp分布式事件配置
-
-
-
-
- 服务提供者
-
-
-
-
- Creates a new .
-
-
-
-
- 查找消费者集合
-
-
-
-
-
-
- 获取事件处理器集合
-
-
-
-
-
-
-
- AbpCAPEventBusModule
-
-
-
-
- ConfigureServices
-
-
-
-
-
- 过期消息清理配置项
-
-
-
-
- 发布消息处理失败通知
- default: false
-
-
-
-
- AbpECAPExecutionFailedException
-
-
-
-
- MessageType
-
-
-
-
- Message
-
-
-
-
- constructor
-
-
-
-
-
-
- constructor
-
-
-
-
-
-
-
- constructor
-
-
-
-
-
-
-
-
- CAP消息扩展
-
-
-
-
- 尝试获取消息标头中的租户标识
-
-
-
-
-
-
-
- 获取消息标头中的租户标识
-
-
-
-
-
-
- 尝试获取消息标头中的链路标识
-
-
-
-
-
-
-
- 获取消息标头中的链路标识
-
-
-
-
-
-
- 重写 ISubscribeInvoker 实现 Abp 租户集成
-
-
-
-
- AbpCAPSubscribeInvoker
-
-
-
-
-
-
-
-
-
- 调用订阅者方法
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 获取事件处理类实例
-
-
-
-
-
-
-
- 通过给定的类型实例与参数调用订阅者方法
-
-
-
-
-
-
-
-
- CAP分布式事件总线
-
-
-
-
- CAP消息发布接口
-
-
-
-
- 自定义事件注册接口
-
-
-
-
- 本地事件处理器工厂对象集合
-
-
-
-
- 本地事件集合
-
-
-
-
- 当前用户
-
-
-
-
- 当前客户端
-
-
-
-
- typeof
-
-
-
-
- 取消令牌
-
-
-
-
- constructor
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 订阅事件
-
-
-
-
-
-
-
- 退订事件
-
- 事件类型
-
-
-
-
- 退订事件
-
- 事件类型
- 事件处理器
-
-
-
- 退订事件
-
- 事件类型
- 事件处理器工厂
-
-
-
- 退订所有事件
-
- 事件类型
-
-
-
- 发布事件
-
- 事件类型
- 事件数据对象
-
-
-
-
- 获取事件处理器工厂列表
-
-
-
-
-
-
- 自定义事件订阅者
-
-
-
-
- 订阅事件
-
-
-
-
-
-
- 取消订阅
-
-
-
-
-
-
- Executes the configured method on . This can be used whether or not
- the configured method is asynchronous.
-
-
- Even if the target method is asynchronous, it's desirable to invoke it using Execute rather than
- ExecuteAsync if you know at compile time what the return type is, because then you can directly
- "await" that value (via a cast), and then the generated code will be able to reference the
- resulting awaitable as a value-typed variable. If you use ExecuteAsync instead, the generated
- code will have to treat the resulting awaitable as a boxed object, because it doesn't know at
- compile time what type it would be.
-
- The object whose method is to be executed.
- Parameters to pass to the method.
- The method return value.
-
-
-
- Executes the configured method on . This can only be used if the configured
- method is asynchronous.
-
-
- If you don't know at compile time the type of the method's returned awaitable, you can use ExecuteAsync,
- which supplies an awaitable-of-object. This always works, but can incur several extra heap allocations
- as compared with using Execute and then using "await" on the result value typecasted to the known
- awaitable type. The possible extra heap allocations are for:
- 1. The custom awaitable (though usually there's a heap allocation for this anyway, since normally
- it's a reference type, and you normally create a new instance per call).
- 2. The custom awaiter (whether or not it's a value type, since if it's not, you need a new instance
- of it, and if it is, it will have to be boxed so the calling code can reference it as an object).
- 3. The async result value, if it's a value type (it has to be boxed as an object, since the calling
- code doesn't know what type it's going to be).
-
- The object whose method is to be executed.
- Parameters to pass to the method.
- An object that you can "await" to get the method return value.
-
-
-
- Provides a common awaitable structure that can
- return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an
- application-defined custom awaitable.
-
-
-
-
- Helper for detecting whether a given type is FSharpAsync`1, and if so, supplying
- an for mapping instances of that type to a C# awaitable.
-
-
- The main design goal here is to avoid taking a compile-time dependency on
- FSharp.Core.dll, because non-F# applications wouldn't use it. So all the references
- to FSharp types have to be constructed dynamically at runtime.
-
-
-
-
- CAP ServiceCollectionExtensions
-
-
-
-
- Adds and configures the consistence services for the consistency.
-
-
-
-
-
-
-