diff --git a/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs b/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs
new file mode 100644
index 000000000..13ccf1b9f
--- /dev/null
+++ b/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs
@@ -0,0 +1,11 @@
+namespace LINGYUN.Abp.EventBus.CAP
+{
+ public static class AbpCAPHeaders
+ {
+ public static string ClientId { get; set; } = "cap-abp-client-id";
+
+ public static string UserId { get; set; } = "cap-abp-user-id";
+
+ public static string TenantId { get; set; } = "cap-abp-tenant-id";
+ }
+}
diff --git a/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs b/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs
new file mode 100644
index 000000000..08e770bcd
--- /dev/null
+++ b/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs
@@ -0,0 +1,46 @@
+using DotNetCore.CAP.Messages;
+using System;
+
+namespace LINGYUN.Abp.EventBus.CAP
+{
+ ///
+ /// CAP消息扩展
+ ///
+ public static class AbpCAPMessageExtensions
+ {
+ ///
+ /// 尝试获取消息标头中的租户标识
+ ///
+ ///
+ ///
+ ///
+ public static bool TryGetTenantId(
+ this Message message,
+ out Guid tenantId)
+ {
+ if (message.Headers.TryGetValue(AbpCAPHeaders.TenantId, out string tenantStr))
+ {
+ if (Guid.TryParse(tenantStr, out tenantId))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+ ///
+ /// 获取消息标头中的租户标识
+ ///
+ ///
+ ///
+ public static Guid? GetTenantIdOrNull(
+ this Message message)
+ {
+ if (message.TryGetTenantId(out Guid tenantId))
+ {
+ return tenantId;
+ }
+ return null;
+ }
+ }
+}
diff --git a/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs b/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs
new file mode 100644
index 000000000..b0bbf1069
--- /dev/null
+++ b/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs
@@ -0,0 +1,185 @@
+using DotNetCore.CAP;
+using DotNetCore.CAP.Internal;
+using DotNetCore.CAP.Messages;
+using DotNetCore.CAP.Serialization;
+using LINGYUN.Abp.EventBus.CAP.Internal;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Concurrent;
+using System.ComponentModel;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Volo.Abp.MultiTenancy;
+
+namespace LINGYUN.Abp.EventBus.CAP
+{
+ ///
+ /// 重写 ISubscribeInvoker 实现 Abp 租户集成
+ ///
+ public class AbpCAPSubscribeInvoker : ISubscribeInvoker
+ {
+ private readonly ICurrentTenant _currentTenant;
+
+ private readonly ILogger _logger;
+ private readonly IServiceProvider _serviceProvider;
+ private readonly ISerializer _serializer;
+ private readonly ConcurrentDictionary _executors;
+ ///
+ /// AbpCAPSubscribeInvoker
+ ///
+ ///
+ ///
+ ///
+ ///
+ public AbpCAPSubscribeInvoker(
+ ILoggerFactory loggerFactory,
+ IServiceProvider serviceProvider,
+ ISerializer serializer,
+ ICurrentTenant currentTenant)
+ {
+ _currentTenant = currentTenant;
+ _serviceProvider = serviceProvider;
+ _serializer = serializer;
+ _logger = loggerFactory.CreateLogger();
+ _executors = new ConcurrentDictionary();
+ }
+ ///
+ /// 调用订阅者方法
+ ///
+ ///
+ ///
+ ///
+ public virtual async Task InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var methodInfo = context.ConsumerDescriptor.MethodInfo;
+ var reflectedType = methodInfo.ReflectedType.Name;
+
+ _logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name);
+
+ var key = $"{methodInfo.Module.Name}_{reflectedType}_{methodInfo.MetadataToken}";
+ var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));
+
+ using var scope = _serviceProvider.CreateScope();
+
+ var provider = scope.ServiceProvider;
+
+ var obj = GetInstance(provider, context);
+
+ var message = context.DeliverMessage;
+ var parameterDescriptors = context.ConsumerDescriptor.Parameters;
+ var executeParameters = new object[parameterDescriptors.Count];
+ // 租户数据可能在消息标头中
+ var tenantId = message.GetTenantIdOrNull();
+ for (var i = 0; i < parameterDescriptors.Count; i++)
+ {
+ if (parameterDescriptors[i].IsFromCap)
+ {
+ executeParameters[i] = new CapHeader(message.Headers);
+ }
+ else
+ {
+ if (message.Value != null)
+ {
+ if (_serializer.IsJsonType(message.Value)) // use ISerializer when reading from storage, skip other objects if not Json
+ {
+ var eventData = _serializer.Deserialize(message.Value, parameterDescriptors[i].ParameterType);
+ // 租户数据也可能存在事件数据中
+ if (tenantId == null && eventData is IMultiTenant tenant)
+ {
+ tenantId = tenant.TenantId;
+ }
+ executeParameters[i] = eventData;
+ }
+ else
+ {
+ var converter = TypeDescriptor.GetConverter(parameterDescriptors[i].ParameterType);
+ if (converter.CanConvertFrom(message.Value.GetType()))
+ {
+ var eventData = converter.ConvertFrom(message.Value);
+ // 租户数据也可能存在事件数据中
+ if (tenantId == null && eventData is IMultiTenant tenant)
+ {
+ tenantId = tenant.TenantId;
+ }
+ executeParameters[i] = eventData;
+ }
+ else
+ {
+ if (parameterDescriptors[i].ParameterType.IsInstanceOfType(message.Value))
+ {
+ // 租户数据也可能存在事件数据中
+ if (tenantId == null && message.Value is IMultiTenant tenant)
+ {
+ tenantId = tenant.TenantId;
+ }
+ executeParameters[i] = message.Value;
+ }
+ else
+ {
+ var eventData = Convert.ChangeType(message.Value, parameterDescriptors[i].ParameterType);
+ // 租户数据也可能存在事件数据中
+ if (tenantId == null && eventData is IMultiTenant tenant)
+ {
+ tenantId = tenant.TenantId;
+ }
+ executeParameters[i] = eventData;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // 改变租户
+ using (_currentTenant.Change(tenantId))
+ {
+ var resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters);
+ return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName());
+ }
+ }
+ ///
+ /// 获取事件处理类实例
+ ///
+ ///
+ ///
+ ///
+ protected virtual object GetInstance(IServiceProvider provider, ConsumerContext context)
+ {
+ var srvType = context.ConsumerDescriptor.ServiceTypeInfo?.AsType();
+ var implType = context.ConsumerDescriptor.ImplTypeInfo.AsType();
+
+ object obj = null;
+ if (srvType != null)
+ {
+ obj = provider.GetServices(srvType).FirstOrDefault(o => o.GetType() == implType);
+ }
+
+ if (obj == null)
+ {
+ obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType);
+ }
+
+ return obj;
+ }
+ ///
+ /// 通过给定的类型实例与参数调用订阅者方法
+ ///
+ ///
+ ///
+ ///
+ ///
+ private async Task