From 6fef193f953febd58ce0ad55386505fbc7d92f45 Mon Sep 17 00:00:00 2001 From: colin Date: Tue, 18 Feb 2025 11:02:49 +0800 Subject: [PATCH] upgrade: upgrade cap to 8.3.2 --- Directory.Packages.props | 4 +- .../LINGYUN.Abp.EventBus.CAP.xml | 20 +++- .../LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs | 2 + .../EventBus/CAP/AbpCAPMessageExtensions.cs | 25 +++++ .../EventBus/CAP/AbpCAPSubscribeInvoker.cs | 101 +++++++++-------- .../EventBus/CAP/CAPDistributedEventBus.cs | 106 +++++++++++------- .../LINGYUN.Abp.EventBus.CAP.xml | 20 +++- .../IdentitySessionCacheItemSynchronizer.cs | 10 ++ ...rviceApplicationsSingleModule.Configure.cs | 5 +- 9 files changed, 199 insertions(+), 94 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index 72c27a05d..306e69d2e 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -1,6 +1,6 @@ - 8.2.0 + 8.3.2 2.14.1 3.3.0-rc7 9.0.4 @@ -75,6 +75,7 @@ + @@ -208,6 +209,7 @@ + diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml index 00d831e97..35b6776bb 100644 --- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml +++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml @@ -125,17 +125,33 @@ + + + 尝试获取消息标头中的链路标识 + + + + + + + + 获取消息标头中的链路标识 + + + + 重写 ISubscribeInvoker 实现 Abp 租户集成 - + AbpCAPSubscribeInvoker + @@ -179,7 +195,7 @@ CAP分布式事件总线 - + CAP消息发布接口 diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs index ade52e974..cb7b02f27 100644 --- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs +++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs @@ -9,4 +9,6 @@ public static class AbpCAPHeaders public static string TenantId { get; set; } = "cap-abp-tenant-id"; public static string MessageId { get; set; } = "cap-abp-message-id"; + + public static string CorrelationId { get; set; } = "cap-abp-correlation-id"; } diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs index 8912a6785..b4745f073 100644 --- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs +++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs @@ -43,4 +43,29 @@ public static class AbpCAPMessageExtensions } return null; } + /// + /// 尝试获取消息标头中的链路标识 + /// + /// + /// + /// + public static bool TryGetCorrelationId( + this Message message, + out string correlationId) + { + return message.Headers.TryGetValue(AbpCAPHeaders.CorrelationId, out correlationId); + } + /// + /// 获取消息标头中的链路标识 + /// + /// + /// + public static string GetCorrelationIdOrNull(this Message message) + { + if (message.TryGetCorrelationId(out var correlationId)) + { + return correlationId; + } + return null; + } } diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs index cc3a70975..91398617a 100644 --- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs +++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs @@ -14,6 +14,8 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using Volo.Abp.MultiTenancy; +using Volo.Abp.Threading; +using Volo.Abp.Tracing; namespace LINGYUN.Abp.EventBus.CAP; @@ -26,6 +28,7 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; + private readonly ICorrelationIdProvider _correlationIdProvider; private readonly ISerializer _serializer; private readonly ConcurrentDictionary _executors; /// @@ -33,16 +36,19 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker /// /// /// + /// /// /// public AbpCAPSubscribeInvoker( ILoggerFactory loggerFactory, - IServiceProvider serviceProvider, + IServiceProvider serviceProvider, + ICorrelationIdProvider correlationIdProvider, ISerializer serializer, ICurrentTenant currentTenant) { _currentTenant = currentTenant; _serviceProvider = serviceProvider; + _correlationIdProvider = correlationIdProvider; _serializer = serializer; _logger = loggerFactory.CreateLogger(); _executors = new ConcurrentDictionary(); @@ -66,7 +72,9 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo)); - using var scope = _serviceProvider.CreateScope(); + // using var scope = _serviceProvider.CreateScope(); + // see: https://github.com/dotnetcore/CAP/commit/47c071e8ddf0ea4e636edab66ee7b43e59b602fe + await using var scope = _serviceProvider.CreateAsyncScope(); var provider = scope.ServiceProvider; @@ -77,6 +85,7 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker var executeParameters = new object[parameterDescriptors.Count]; // 租户数据可能在消息标头中 var tenantId = message.GetTenantIdOrNull(); + var correlationId = message.GetCorrelationIdOrNull(); for (var i = 0; i < parameterDescriptors.Count; i++) { var parameterDescriptor = parameterDescriptors[i]; @@ -138,64 +147,68 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker } } - // 改变租户 - using (_currentTenant.Change(tenantId)) + // 分布式链路 + using (_correlationIdProvider.Change(correlationId)) { - var filter = provider.GetService(); - object resultObj = null; - - try + // 改变租户 + using (_currentTenant.Change(tenantId)) { - if (filter != null) + var filter = provider.GetService(); + object resultObj = null; + + try { - var etContext = new ExecutingContext(context, executeParameters); - await filter.OnSubscribeExecutingAsync(etContext); - executeParameters = etContext.Arguments; - } + if (filter != null) + { + var etContext = new ExecutingContext(context, executeParameters); + await filter.OnSubscribeExecutingAsync(etContext); + executeParameters = etContext.Arguments; + } - resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters); + resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters); - if (filter != null) - { - var edContext = new ExecutedContext(context, resultObj); - await filter.OnSubscribeExecutedAsync(edContext); - resultObj = edContext.Result; + if (filter != null) + { + var edContext = new ExecutedContext(context, resultObj); + await filter.OnSubscribeExecutedAsync(edContext); + resultObj = edContext.Result; + } } - } - catch (Exception e) - { - if (filter != null) + catch (Exception e) { - var exContext = new ExceptionContext(context, e); - await filter.OnSubscribeExceptionAsync(exContext); - if (!exContext.ExceptionHandled) + if (filter != null) { - throw exContext.Exception; - } + var exContext = new ExceptionContext(context, e); + await filter.OnSubscribeExceptionAsync(exContext); + if (!exContext.ExceptionHandled) + { + throw exContext.Exception; + } - if (exContext.Result != null) + if (exContext.Result != null) + { + resultObj = exContext.Result; + } + } + else { - resultObj = exContext.Result; + throw; } } + + var callbackName = message.GetCallbackName(); + if (string.IsNullOrEmpty(callbackName)) + { + return new ConsumerExecutedResult(resultObj, message.GetId(), null, null); + } else { - throw; + var capHeader = executeParameters.FirstOrDefault(x => x is CapHeader) as CapHeader; + IDictionary callbackHeader = null; + // TODO: CapHeader.ResponseHeader + return new ConsumerExecutedResult(resultObj, message.GetId(), callbackName, callbackHeader); } } - - var callbackName = message.GetCallbackName(); - if (string.IsNullOrEmpty(callbackName)) - { - return new ConsumerExecutedResult(resultObj, message.GetId(), null, null); - } - else - { - var capHeader = executeParameters.FirstOrDefault(x => x is CapHeader) as CapHeader; - IDictionary callbackHeader = null; - // TODO: CapHeader.ResponseHeader - return new ConsumerExecutedResult(resultObj, message.GetId(), callbackName, callbackHeader); - } } } /// diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/CAPDistributedEventBus.cs b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/CAPDistributedEventBus.cs index a2ea5a089..bf5f566be 100644 --- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/CAPDistributedEventBus.cs +++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/CAPDistributedEventBus.cs @@ -34,7 +34,7 @@ public class CAPDistributedEventBus : DistributedEventBusBase, IDistributedEvent /// /// CAP消息发布接口 /// - protected readonly ICapPublisher CapPublisher; + protected ICapPublisher CapPublisher { get; } /// /// 自定义事件注册接口 /// @@ -166,7 +166,8 @@ public class CAPDistributedEventBus : DistributedEventBusBase, IDistributedEvent protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); - await PublishAsync(eventName, eventData); + + await PublishToCapAsync(eventName, eventData, messageId: null, correlationId: CorrelationIdProvider.Get()); } /// @@ -204,9 +205,39 @@ public class CAPDistributedEventBus : DistributedEventBusBase, IDistributedEvent return false; } + protected override byte[] Serialize(object eventData) + { + var eventJson = JsonSerializer.Serialize(eventData); + + return Encoding.UTF8.GetBytes(eventJson); + } + + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) + { + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); + } + public override async Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData); + await PublishToCapAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.Id, outgoingEvent.GetCorrelationId()); + + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) + { + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } + } + + public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) + { + foreach (var outgoingEvent in outgoingEvents) + { + await PublishFromOutboxAsync(outgoingEvent, outboxConfig); + } } public override async Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) @@ -220,57 +251,46 @@ public class CAPDistributedEventBus : DistributedEventBusBase, IDistributedEvent var eventJson = Encoding.UTF8.GetString(incomingEvent.EventData); var eventData = JsonSerializer.Deserialize(eventType, eventJson); var exceptions = new List(); - await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } + if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); } } - protected override byte[] Serialize(object eventData) - { - var eventJson = JsonSerializer.Serialize(eventData); - - return Encoding.UTF8.GetBytes(eventJson); - } - - protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) - { - unitOfWork.AddOrReplaceDistributedEvent(eventRecord); - } - - protected async Task PublishAsync(string eventName, object eventData) + protected virtual async Task PublishToCapAsync(Type eventType, object eventData, Guid? messageId, string correlationId = null) { - await CapPublisher - .PublishAsync( - eventName, eventData, - new Dictionary - { - { AbpCAPHeaders.UserId, CurrentUser.Id?.ToString() ?? "" }, - { AbpCAPHeaders.ClientId, CurrentClient.Id ?? "" }, - { AbpCAPHeaders.TenantId, CurrentTenant.Id?.ToString() ?? "" }, - }, - CancellationTokenProvider.FallbackToProvider()); + await PublishToCapAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData, null, correlationId); } - public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) + protected virtual async Task PublishToCapAsync(string eventName, object eventData, Guid? messageId, string correlationId = null) { - var outgoingEventArray = outgoingEvents.ToArray(); - - foreach (var outgoingEvent in outgoingEventArray) + var headers = new Dictionary(); + if (messageId.HasValue) + { + headers.TryAdd(AbpCAPHeaders.MessageId, messageId.ToString()); + } + if (CurrentUser.Id.HasValue) + { + headers.TryAdd(AbpCAPHeaders.UserId, CurrentUser.Id.ToString()); + } + if (CurrentTenant.Id.HasValue) + { + headers.TryAdd(AbpCAPHeaders.TenantId, CurrentTenant.Id.ToString()); + } + if (!CurrentClient.Id.IsNullOrWhiteSpace()) { - await CapPublisher - .PublishAsync( - outgoingEvent.EventName, - outgoingEvent.EventData, - new Dictionary - { - { AbpCAPHeaders.MessageId, outgoingEvent.Id.ToString() }, - { AbpCAPHeaders.UserId, CurrentUser.Id?.ToString() ?? "" }, - { AbpCAPHeaders.ClientId, CurrentClient.Id ?? "" }, - { AbpCAPHeaders.TenantId, CurrentTenant.Id?.ToString() ?? "" }, - }, - CancellationTokenProvider.FallbackToProvider()); + headers.TryAdd(AbpCAPHeaders.ClientId, CurrentClient.Id); } + if (!correlationId.IsNullOrWhiteSpace()) + { + headers.TryAdd(AbpCAPHeaders.CorrelationId, correlationId); + } + + await CapPublisher.PublishAsync(eventName, eventData, headers, CancellationTokenProvider.FallbackToProvider()); } } diff --git a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml index 00d831e97..35b6776bb 100644 --- a/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml +++ b/aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml @@ -125,17 +125,33 @@ + + + 尝试获取消息标头中的链路标识 + + + + + + + + 获取消息标头中的链路标识 + + + + 重写 ISubscribeInvoker 实现 Abp 租户集成 - + AbpCAPSubscribeInvoker + @@ -179,7 +195,7 @@ CAP分布式事件总线 - + CAP消息发布接口 diff --git a/aspnet-core/modules/identity/LINGYUN.Abp.Identity.Domain/LINGYUN/Abp/Identity/Session/IdentitySessionCacheItemSynchronizer.cs b/aspnet-core/modules/identity/LINGYUN.Abp.Identity.Domain/LINGYUN/Abp/Identity/Session/IdentitySessionCacheItemSynchronizer.cs index 091466287..bff98e2c2 100644 --- a/aspnet-core/modules/identity/LINGYUN.Abp.Identity.Domain/LINGYUN/Abp/Identity/Session/IdentitySessionCacheItemSynchronizer.cs +++ b/aspnet-core/modules/identity/LINGYUN.Abp.Identity.Domain/LINGYUN/Abp/Identity/Session/IdentitySessionCacheItemSynchronizer.cs @@ -1,14 +1,18 @@ using System; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; +using Volo.Abp.Domain.Entities.Events; using Volo.Abp.Domain.Entities.Events.Distributed; +using Volo.Abp.EventBus; using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Identity; namespace LINGYUN.Abp.Identity.Session; public class IdentitySessionCacheItemSynchronizer : IDistributedEventHandler>, IDistributedEventHandler>, IDistributedEventHandler, + ILocalEventHandler>, ITransientDependency { protected IIdentitySessionCache IdentitySessionCache { get; } @@ -63,4 +67,10 @@ public class IdentitySessionCacheItemSynchronizer : await IdentitySessionCache.RemoveAsync(eventData.SessionId); } } + + public async virtual Task HandleEventAsync(EntityDeletedEventData eventData) + { + // 用户被删除, 移除所有会话 + await IdentitySessionStore.RevokeAllAsync(eventData.Entity.Id); + } } diff --git a/aspnet-core/services/LY.MicroService.Applications.Single/MicroServiceApplicationsSingleModule.Configure.cs b/aspnet-core/services/LY.MicroService.Applications.Single/MicroServiceApplicationsSingleModule.Configure.cs index b86d814e1..854d02a15 100644 --- a/aspnet-core/services/LY.MicroService.Applications.Single/MicroServiceApplicationsSingleModule.Configure.cs +++ b/aspnet-core/services/LY.MicroService.Applications.Single/MicroServiceApplicationsSingleModule.Configure.cs @@ -1,4 +1,3 @@ - using VoloAbpExceptionHandlingOptions = Volo.Abp.AspNetCore.ExceptionHandling.AbpExceptionHandlingOptions; namespace LY.MicroService.Applications.Single; @@ -42,7 +41,9 @@ public partial class MicroServiceApplicationsSingleModule options.UseDashboard(); if (!configuration.GetValue("CAP:IsEnabled")) { - options.UseInMemoryStorage().UseInMemoryMessageQueue(); + options + .UseInMemoryStorage() + .UseRedis(configuration["CAP:Redis:Configuration"]); return; } options