|
|
|
@ -14,6 +14,8 @@ using System.Linq; |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using Volo.Abp.MultiTenancy; |
|
|
|
using Volo.Abp.Threading; |
|
|
|
using Volo.Abp.Tracing; |
|
|
|
|
|
|
|
namespace LINGYUN.Abp.EventBus.CAP; |
|
|
|
|
|
|
|
@ -26,6 +28,7 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker |
|
|
|
|
|
|
|
private readonly ILogger _logger; |
|
|
|
private readonly IServiceProvider _serviceProvider; |
|
|
|
private readonly ICorrelationIdProvider _correlationIdProvider; |
|
|
|
private readonly ISerializer _serializer; |
|
|
|
private readonly ConcurrentDictionary<string, ObjectMethodExecutor> _executors; |
|
|
|
/// <summary>
|
|
|
|
@ -33,16 +36,19 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker |
|
|
|
/// </summary>
|
|
|
|
/// <param name="loggerFactory"></param>
|
|
|
|
/// <param name="serviceProvider"></param>
|
|
|
|
/// <param name="correlationIdProvider"></param>
|
|
|
|
/// <param name="serializer"></param>
|
|
|
|
/// <param name="currentTenant"></param>
|
|
|
|
public AbpCAPSubscribeInvoker( |
|
|
|
ILoggerFactory loggerFactory, |
|
|
|
IServiceProvider serviceProvider, |
|
|
|
IServiceProvider serviceProvider, |
|
|
|
ICorrelationIdProvider correlationIdProvider, |
|
|
|
ISerializer serializer, |
|
|
|
ICurrentTenant currentTenant) |
|
|
|
{ |
|
|
|
_currentTenant = currentTenant; |
|
|
|
_serviceProvider = serviceProvider; |
|
|
|
_correlationIdProvider = correlationIdProvider; |
|
|
|
_serializer = serializer; |
|
|
|
_logger = loggerFactory.CreateLogger<SubscribeInvoker>(); |
|
|
|
_executors = new ConcurrentDictionary<string, ObjectMethodExecutor>(); |
|
|
|
@ -66,7 +72,9 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker |
|
|
|
|
|
|
|
var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo)); |
|
|
|
|
|
|
|
using var scope = _serviceProvider.CreateScope(); |
|
|
|
// using var scope = _serviceProvider.CreateScope();
|
|
|
|
// see: https://github.com/dotnetcore/CAP/commit/47c071e8ddf0ea4e636edab66ee7b43e59b602fe
|
|
|
|
await using var scope = _serviceProvider.CreateAsyncScope(); |
|
|
|
|
|
|
|
var provider = scope.ServiceProvider; |
|
|
|
|
|
|
|
@ -77,6 +85,7 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker |
|
|
|
var executeParameters = new object[parameterDescriptors.Count]; |
|
|
|
// 租户数据可能在消息标头中
|
|
|
|
var tenantId = message.GetTenantIdOrNull(); |
|
|
|
var correlationId = message.GetCorrelationIdOrNull(); |
|
|
|
for (var i = 0; i < parameterDescriptors.Count; i++) |
|
|
|
{ |
|
|
|
var parameterDescriptor = parameterDescriptors[i]; |
|
|
|
@ -138,64 +147,68 @@ public class AbpCAPSubscribeInvoker : ISubscribeInvoker |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 改变租户
|
|
|
|
using (_currentTenant.Change(tenantId)) |
|
|
|
// 分布式链路
|
|
|
|
using (_correlationIdProvider.Change(correlationId)) |
|
|
|
{ |
|
|
|
var filter = provider.GetService<ISubscribeFilter>(); |
|
|
|
object resultObj = null; |
|
|
|
|
|
|
|
try |
|
|
|
// 改变租户
|
|
|
|
using (_currentTenant.Change(tenantId)) |
|
|
|
{ |
|
|
|
if (filter != null) |
|
|
|
var filter = provider.GetService<ISubscribeFilter>(); |
|
|
|
object resultObj = null; |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
var etContext = new ExecutingContext(context, executeParameters); |
|
|
|
await filter.OnSubscribeExecutingAsync(etContext); |
|
|
|
executeParameters = etContext.Arguments; |
|
|
|
} |
|
|
|
if (filter != null) |
|
|
|
{ |
|
|
|
var etContext = new ExecutingContext(context, executeParameters); |
|
|
|
await filter.OnSubscribeExecutingAsync(etContext); |
|
|
|
executeParameters = etContext.Arguments; |
|
|
|
} |
|
|
|
|
|
|
|
resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters); |
|
|
|
resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters); |
|
|
|
|
|
|
|
if (filter != null) |
|
|
|
{ |
|
|
|
var edContext = new ExecutedContext(context, resultObj); |
|
|
|
await filter.OnSubscribeExecutedAsync(edContext); |
|
|
|
resultObj = edContext.Result; |
|
|
|
if (filter != null) |
|
|
|
{ |
|
|
|
var edContext = new ExecutedContext(context, resultObj); |
|
|
|
await filter.OnSubscribeExecutedAsync(edContext); |
|
|
|
resultObj = edContext.Result; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Exception e) |
|
|
|
{ |
|
|
|
if (filter != null) |
|
|
|
catch (Exception e) |
|
|
|
{ |
|
|
|
var exContext = new ExceptionContext(context, e); |
|
|
|
await filter.OnSubscribeExceptionAsync(exContext); |
|
|
|
if (!exContext.ExceptionHandled) |
|
|
|
if (filter != null) |
|
|
|
{ |
|
|
|
throw exContext.Exception; |
|
|
|
} |
|
|
|
var exContext = new ExceptionContext(context, e); |
|
|
|
await filter.OnSubscribeExceptionAsync(exContext); |
|
|
|
if (!exContext.ExceptionHandled) |
|
|
|
{ |
|
|
|
throw exContext.Exception; |
|
|
|
} |
|
|
|
|
|
|
|
if (exContext.Result != null) |
|
|
|
if (exContext.Result != null) |
|
|
|
{ |
|
|
|
resultObj = exContext.Result; |
|
|
|
} |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
resultObj = exContext.Result; |
|
|
|
throw; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
var callbackName = message.GetCallbackName(); |
|
|
|
if (string.IsNullOrEmpty(callbackName)) |
|
|
|
{ |
|
|
|
return new ConsumerExecutedResult(resultObj, message.GetId(), null, null); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
throw; |
|
|
|
var capHeader = executeParameters.FirstOrDefault(x => x is CapHeader) as CapHeader; |
|
|
|
IDictionary<string, string> callbackHeader = null; |
|
|
|
// TODO: CapHeader.ResponseHeader
|
|
|
|
return new ConsumerExecutedResult(resultObj, message.GetId(), callbackName, callbackHeader); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
var callbackName = message.GetCallbackName(); |
|
|
|
if (string.IsNullOrEmpty(callbackName)) |
|
|
|
{ |
|
|
|
return new ConsumerExecutedResult(resultObj, message.GetId(), null, null); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
var capHeader = executeParameters.FirstOrDefault(x => x is CapHeader) as CapHeader; |
|
|
|
IDictionary<string, string> callbackHeader = null; |
|
|
|
// TODO: CapHeader.ResponseHeader
|
|
|
|
return new ConsumerExecutedResult(resultObj, message.GetId(), callbackName, callbackHeader); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
/// <summary>
|
|
|
|
|