Browse Source

upgrade: upgrade cap to 8.3.2

pull/1108/head
colin 1 year ago
parent
commit
6fef193f95
  1. 4
      Directory.Packages.props
  2. 20
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml
  3. 2
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs
  4. 25
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs
  5. 101
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs
  6. 106
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/CAPDistributedEventBus.cs
  7. 20
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml
  8. 10
      aspnet-core/modules/identity/LINGYUN.Abp.Identity.Domain/LINGYUN/Abp/Identity/Session/IdentitySessionCacheItemSynchronizer.cs
  9. 5
      aspnet-core/services/LY.MicroService.Applications.Single/MicroServiceApplicationsSingleModule.Configure.cs

4
Directory.Packages.props

@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<DotNetCoreCAPPackageVersion>8.2.0</DotNetCoreCAPPackageVersion>
<DotNetCoreCAPPackageVersion>8.3.2</DotNetCoreCAPPackageVersion>
<ElsaPackageVersion>2.14.1</ElsaPackageVersion>
<ElsaNextPackageVersion>3.3.0-rc7</ElsaNextPackageVersion>
<VoloAbpPackageVersion>9.0.4</VoloAbpPackageVersion>
@ -75,6 +75,7 @@
<PackageVersion Include="Volo.Abp.EventBus.Abstractions" Version="$(VoloAbpPackageVersion)" />
<PackageVersion Include="Volo.Abp.ExceptionHandling" Version="$(VoloAbpPackageVersion)" />
<PackageVersion Include="Volo.Abp.Guids" Version="$(VoloAbpPackageVersion)" />
<PackageVersion Include="Volo.Abp.Gdpr.Abstractions" Version="$(VoloAbpPackageVersion)" />
<PackageVersion Include="Volo.Abp.GlobalFeatures" Version="$(VoloAbpPackageVersion)" />
<PackageVersion Include="Volo.Abp.HangFire" Version="$(VoloAbpPackageVersion)" />
<PackageVersion Include="Volo.Abp.Http.Client" Version="$(VoloAbpPackageVersion)" />
@ -208,6 +209,7 @@
<PackageVersion Include="DotNetCore.CAP.OpenTelemetry" Version="$(DotNetCoreCAPPackageVersion)" />
<PackageVersion Include="DotNetCore.CAP.RabbitMQ" Version="$(DotNetCoreCAPPackageVersion)" />
<PackageVersion Include="DotNetCore.CAP.InMemoryStorage" Version="$(DotNetCoreCAPPackageVersion)" />
<PackageVersion Include="DotNetCore.CAP.RedisStreams" Version="$(DotNetCoreCAPPackageVersion)" />
<PackageVersion Include="Savorboard.CAP.InMemoryMessageQueue" Version="$(DotNetCoreCAPPackageVersion)" />
</ItemGroup>
<!-- Serilog -->

20
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml

@ -125,17 +125,33 @@
<param name="message"></param>
<returns></returns>
</member>
<member name="M:LINGYUN.Abp.EventBus.CAP.AbpCAPMessageExtensions.TryGetCorrelationId(DotNetCore.CAP.Messages.Message,System.String@)">
<summary>
尝试获取消息标头中的链路标识
</summary>
<param name="message"></param>
<param name="correlationId"></param>
<returns></returns>
</member>
<member name="M:LINGYUN.Abp.EventBus.CAP.AbpCAPMessageExtensions.GetCorrelationIdOrNull(DotNetCore.CAP.Messages.Message)">
<summary>
获取消息标头中的链路标识
</summary>
<param name="message"></param>
<returns></returns>
</member>
<member name="T:LINGYUN.Abp.EventBus.CAP.AbpCAPSubscribeInvoker">
<summary>
重写 ISubscribeInvoker 实现 Abp 租户集成
</summary>
</member>
<member name="M:LINGYUN.Abp.EventBus.CAP.AbpCAPSubscribeInvoker.#ctor(Microsoft.Extensions.Logging.ILoggerFactory,System.IServiceProvider,DotNetCore.CAP.Serialization.ISerializer,Volo.Abp.MultiTenancy.ICurrentTenant)">
<member name="M:LINGYUN.Abp.EventBus.CAP.AbpCAPSubscribeInvoker.#ctor(Microsoft.Extensions.Logging.ILoggerFactory,System.IServiceProvider,Volo.Abp.Tracing.ICorrelationIdProvider,DotNetCore.CAP.Serialization.ISerializer,Volo.Abp.MultiTenancy.ICurrentTenant)">
<summary>
AbpCAPSubscribeInvoker
</summary>
<param name="loggerFactory"></param>
<param name="serviceProvider"></param>
<param name="correlationIdProvider"></param>
<param name="serializer"></param>
<param name="currentTenant"></param>
</member>
@ -179,7 +195,7 @@
CAP分布式事件总线
</summary>
</member>
<member name="F:LINGYUN.Abp.EventBus.CAP.CAPDistributedEventBus.CapPublisher">
<member name="P:LINGYUN.Abp.EventBus.CAP.CAPDistributedEventBus.CapPublisher">
<summary>
CAP消息发布接口
</summary>

2
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs

@ -9,4 +9,6 @@ public static class AbpCAPHeaders
public static string TenantId { get; set; } = "cap-abp-tenant-id";
public static string MessageId { get; set; } = "cap-abp-message-id";
public static string CorrelationId { get; set; } = "cap-abp-correlation-id";
}

25
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs

@ -43,4 +43,29 @@ public static class AbpCAPMessageExtensions
}
return null;
}
/// <summary>
/// 尝试获取消息标头中的链路标识
/// </summary>
/// <param name="message"></param>
/// <param name="correlationId"></param>
/// <returns></returns>
public static bool TryGetCorrelationId(
this Message message,
out string correlationId)
{
return message.Headers.TryGetValue(AbpCAPHeaders.CorrelationId, out correlationId);
}
/// <summary>
/// 获取消息标头中的链路标识
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public static string GetCorrelationIdOrNull(this Message message)
{
if (message.TryGetCorrelationId(out var correlationId))
{
return correlationId;
}
return null;
}
}

101
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs

@ -14,6 +14,8 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
using Volo.Abp.Tracing;
namespace LINGYUN.Abp.EventBus.CAP;
@ -26,6 +28,7 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly ICorrelationIdProvider _correlationIdProvider;
private readonly ISerializer _serializer;
private readonly ConcurrentDictionary<string, ObjectMethodExecutor> _executors;
/// <summary>
@ -33,16 +36,19 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker
/// </summary>
/// <param name="loggerFactory"></param>
/// <param name="serviceProvider"></param>
/// <param name="correlationIdProvider"></param>
/// <param name="serializer"></param>
/// <param name="currentTenant"></param>
public AbpCAPSubscribeInvoker(
ILoggerFactory loggerFactory,
IServiceProvider serviceProvider,
IServiceProvider serviceProvider,
ICorrelationIdProvider correlationIdProvider,
ISerializer serializer,
ICurrentTenant currentTenant)
{
_currentTenant = currentTenant;
_serviceProvider = serviceProvider;
_correlationIdProvider = correlationIdProvider;
_serializer = serializer;
_logger = loggerFactory.CreateLogger<SubscribeInvoker>();
_executors = new ConcurrentDictionary<string, ObjectMethodExecutor>();
@ -66,7 +72,9 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker
var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));
using var scope = _serviceProvider.CreateScope();
// using var scope = _serviceProvider.CreateScope();
// see: https://github.com/dotnetcore/CAP/commit/47c071e8ddf0ea4e636edab66ee7b43e59b602fe
await using var scope = _serviceProvider.CreateAsyncScope();
var provider = scope.ServiceProvider;
@ -77,6 +85,7 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker
var executeParameters = new object[parameterDescriptors.Count];
// 租户数据可能在消息标头中
var tenantId = message.GetTenantIdOrNull();
var correlationId = message.GetCorrelationIdOrNull();
for (var i = 0; i < parameterDescriptors.Count; i++)
{
var parameterDescriptor = parameterDescriptors[i];
@ -138,64 +147,68 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker
}
}
// 改变租户
using (_currentTenant.Change(tenantId))
// 分布式链路
using (_correlationIdProvider.Change(correlationId))
{
var filter = provider.GetService<ISubscribeFilter>();
object resultObj = null;
try
// 改变租户
using (_currentTenant.Change(tenantId))
{
if (filter != null)
var filter = provider.GetService<ISubscribeFilter>();
object resultObj = null;
try
{
var etContext = new ExecutingContext(context, executeParameters);
await filter.OnSubscribeExecutingAsync(etContext);
executeParameters = etContext.Arguments;
}
if (filter != null)
{
var etContext = new ExecutingContext(context, executeParameters);
await filter.OnSubscribeExecutingAsync(etContext);
executeParameters = etContext.Arguments;
}
resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters);
resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters);
if (filter != null)
{
var edContext = new ExecutedContext(context, resultObj);
await filter.OnSubscribeExecutedAsync(edContext);
resultObj = edContext.Result;
if (filter != null)
{
var edContext = new ExecutedContext(context, resultObj);
await filter.OnSubscribeExecutedAsync(edContext);
resultObj = edContext.Result;
}
}
}
catch (Exception e)
{
if (filter != null)
catch (Exception e)
{
var exContext = new ExceptionContext(context, e);
await filter.OnSubscribeExceptionAsync(exContext);
if (!exContext.ExceptionHandled)
if (filter != null)
{
throw exContext.Exception;
}
var exContext = new ExceptionContext(context, e);
await filter.OnSubscribeExceptionAsync(exContext);
if (!exContext.ExceptionHandled)
{
throw exContext.Exception;
}
if (exContext.Result != null)
if (exContext.Result != null)
{
resultObj = exContext.Result;
}
}
else
{
resultObj = exContext.Result;
throw;
}
}
var callbackName = message.GetCallbackName();
if (string.IsNullOrEmpty(callbackName))
{
return new ConsumerExecutedResult(resultObj, message.GetId(), null, null);
}
else
{
throw;
var capHeader = executeParameters.FirstOrDefault(x => x is CapHeader) as CapHeader;
IDictionary<string, string> callbackHeader = null;
// TODO: CapHeader.ResponseHeader
return new ConsumerExecutedResult(resultObj, message.GetId(), callbackName, callbackHeader);
}
}
var callbackName = message.GetCallbackName();
if (string.IsNullOrEmpty(callbackName))
{
return new ConsumerExecutedResult(resultObj, message.GetId(), null, null);
}
else
{
var capHeader = executeParameters.FirstOrDefault(x => x is CapHeader) as CapHeader;
IDictionary<string, string> callbackHeader = null;
// TODO: CapHeader.ResponseHeader
return new ConsumerExecutedResult(resultObj, message.GetId(), callbackName, callbackHeader);
}
}
}
/// <summary>

106
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/CAPDistributedEventBus.cs

@ -34,7 +34,7 @@ public class CAPDistributedEventBus : DistributedEventBusBase, IDistributedEvent
/// <summary>
/// CAP消息发布接口
/// </summary>
protected readonly ICapPublisher CapPublisher;
protected ICapPublisher CapPublisher { get; }
/// <summary>
/// 自定义事件注册接口
/// </summary>
@ -166,7 +166,8 @@ public class CAPDistributedEventBus : DistributedEventBusBase, IDistributedEvent
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
await PublishAsync(eventName, eventData);
await PublishToCapAsync(eventName, eventData, messageId: null, correlationId: CorrelationIdProvider.Get());
}
/// <summary>
@ -204,9 +205,39 @@ public class CAPDistributedEventBus : DistributedEventBusBase, IDistributedEvent
return false;
}
protected override byte[] Serialize(object eventData)
{
var eventJson = JsonSerializer.Serialize(eventData);
return Encoding.UTF8.GetBytes(eventJson);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
public override async Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData);
await PublishToCapAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.Id, outgoingEvent.GetCorrelationId());
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
}
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
foreach (var outgoingEvent in outgoingEvents)
{
await PublishFromOutboxAsync(outgoingEvent, outboxConfig);
}
}
public override async Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
@ -220,57 +251,46 @@ public class CAPDistributedEventBus : DistributedEventBusBase, IDistributedEvent
var eventJson = Encoding.UTF8.GetString(incomingEvent.EventData);
var eventData = JsonSerializer.Deserialize(eventType, eventJson);
var exceptions = new List<Exception>();
await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig);
using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId()))
{
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
}
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
}
}
protected override byte[] Serialize(object eventData)
{
var eventJson = JsonSerializer.Serialize(eventData);
return Encoding.UTF8.GetBytes(eventJson);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
protected async Task PublishAsync(string eventName, object eventData)
protected virtual async Task PublishToCapAsync(Type eventType, object eventData, Guid? messageId, string correlationId = null)
{
await CapPublisher
.PublishAsync(
eventName, eventData,
new Dictionary<string, string>
{
{ AbpCAPHeaders.UserId, CurrentUser.Id?.ToString() ?? "" },
{ AbpCAPHeaders.ClientId, CurrentClient.Id ?? "" },
{ AbpCAPHeaders.TenantId, CurrentTenant.Id?.ToString() ?? "" },
},
CancellationTokenProvider.FallbackToProvider());
await PublishToCapAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData, null, correlationId);
}
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
protected virtual async Task PublishToCapAsync(string eventName, object eventData, Guid? messageId, string correlationId = null)
{
var outgoingEventArray = outgoingEvents.ToArray();
foreach (var outgoingEvent in outgoingEventArray)
var headers = new Dictionary<string, string>();
if (messageId.HasValue)
{
headers.TryAdd(AbpCAPHeaders.MessageId, messageId.ToString());
}
if (CurrentUser.Id.HasValue)
{
headers.TryAdd(AbpCAPHeaders.UserId, CurrentUser.Id.ToString());
}
if (CurrentTenant.Id.HasValue)
{
headers.TryAdd(AbpCAPHeaders.TenantId, CurrentTenant.Id.ToString());
}
if (!CurrentClient.Id.IsNullOrWhiteSpace())
{
await CapPublisher
.PublishAsync(
outgoingEvent.EventName,
outgoingEvent.EventData,
new Dictionary<string, string>
{
{ AbpCAPHeaders.MessageId, outgoingEvent.Id.ToString() },
{ AbpCAPHeaders.UserId, CurrentUser.Id?.ToString() ?? "" },
{ AbpCAPHeaders.ClientId, CurrentClient.Id ?? "" },
{ AbpCAPHeaders.TenantId, CurrentTenant.Id?.ToString() ?? "" },
},
CancellationTokenProvider.FallbackToProvider());
headers.TryAdd(AbpCAPHeaders.ClientId, CurrentClient.Id);
}
if (!correlationId.IsNullOrWhiteSpace())
{
headers.TryAdd(AbpCAPHeaders.CorrelationId, correlationId);
}
await CapPublisher.PublishAsync(eventName, eventData, headers, CancellationTokenProvider.FallbackToProvider());
}
}

20
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN.Abp.EventBus.CAP.xml

@ -125,17 +125,33 @@
<param name="message"></param>
<returns></returns>
</member>
<member name="M:LINGYUN.Abp.EventBus.CAP.AbpCAPMessageExtensions.TryGetCorrelationId(DotNetCore.CAP.Messages.Message,System.String@)">
<summary>
尝试获取消息标头中的链路标识
</summary>
<param name="message"></param>
<param name="correlationId"></param>
<returns></returns>
</member>
<member name="M:LINGYUN.Abp.EventBus.CAP.AbpCAPMessageExtensions.GetCorrelationIdOrNull(DotNetCore.CAP.Messages.Message)">
<summary>
获取消息标头中的链路标识
</summary>
<param name="message"></param>
<returns></returns>
</member>
<member name="T:LINGYUN.Abp.EventBus.CAP.AbpCAPSubscribeInvoker">
<summary>
重写 ISubscribeInvoker 实现 Abp 租户集成
</summary>
</member>
<member name="M:LINGYUN.Abp.EventBus.CAP.AbpCAPSubscribeInvoker.#ctor(Microsoft.Extensions.Logging.ILoggerFactory,System.IServiceProvider,DotNetCore.CAP.Serialization.ISerializer,Volo.Abp.MultiTenancy.ICurrentTenant)">
<member name="M:LINGYUN.Abp.EventBus.CAP.AbpCAPSubscribeInvoker.#ctor(Microsoft.Extensions.Logging.ILoggerFactory,System.IServiceProvider,Volo.Abp.Tracing.ICorrelationIdProvider,DotNetCore.CAP.Serialization.ISerializer,Volo.Abp.MultiTenancy.ICurrentTenant)">
<summary>
AbpCAPSubscribeInvoker
</summary>
<param name="loggerFactory"></param>
<param name="serviceProvider"></param>
<param name="correlationIdProvider"></param>
<param name="serializer"></param>
<param name="currentTenant"></param>
</member>
@ -179,7 +195,7 @@
CAP分布式事件总线
</summary>
</member>
<member name="F:LINGYUN.Abp.EventBus.CAP.CAPDistributedEventBus.CapPublisher">
<member name="P:LINGYUN.Abp.EventBus.CAP.CAPDistributedEventBus.CapPublisher">
<summary>
CAP消息发布接口
</summary>

10
aspnet-core/modules/identity/LINGYUN.Abp.Identity.Domain/LINGYUN/Abp/Identity/Session/IdentitySessionCacheItemSynchronizer.cs

@ -1,14 +1,18 @@
using System;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities.Events;
using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Identity;
namespace LINGYUN.Abp.Identity.Session;
public class IdentitySessionCacheItemSynchronizer :
IDistributedEventHandler<EntityCreatedEto<IdentitySessionEto>>,
IDistributedEventHandler<EntityDeletedEto<IdentitySessionEto>>,
IDistributedEventHandler<IdentitySessionChangeAccessedEvent>,
ILocalEventHandler<EntityDeletedEventData<IdentityUser>>,
ITransientDependency
{
protected IIdentitySessionCache IdentitySessionCache { get; }
@ -63,4 +67,10 @@ public class IdentitySessionCacheItemSynchronizer :
await IdentitySessionCache.RemoveAsync(eventData.SessionId);
}
}
public async virtual Task HandleEventAsync(EntityDeletedEventData<IdentityUser> eventData)
{
// 用户被删除, 移除所有会话
await IdentitySessionStore.RevokeAllAsync(eventData.Entity.Id);
}
}

5
aspnet-core/services/LY.MicroService.Applications.Single/MicroServiceApplicationsSingleModule.Configure.cs

@ -1,4 +1,3 @@
using VoloAbpExceptionHandlingOptions = Volo.Abp.AspNetCore.ExceptionHandling.AbpExceptionHandlingOptions;
namespace LY.MicroService.Applications.Single;
@ -42,7 +41,9 @@ public partial class MicroServiceApplicationsSingleModule
options.UseDashboard();
if (!configuration.GetValue<bool>("CAP:IsEnabled"))
{
options.UseInMemoryStorage().UseInMemoryMessageQueue();
options
.UseInMemoryStorage()
.UseRedis(configuration["CAP:Redis:Configuration"]);
return;
}
options

Loading…
Cancel
Save