From 81ae954cebffa838eefac264db25b91b9bf3b00a Mon Sep 17 00:00:00 2001 From: maliming Date: Fri, 9 Jun 2023 11:20:48 +0800 Subject: [PATCH 1/9] Change current ICorrelationIdProvider implementations. --- .../AspNetCoreCorrelationIdProvider.cs | 13 ++--- .../Tracing/DefaultCorrelationIdProvider.cs | 21 ++++++-- .../Abp/Tracing/ICorrelationIdProvider.cs | 5 +- .../CorrelationIdProviderController.cs | 17 +++++++ .../CorrelationIdProvider_Tests.cs | 48 +++++++++++++++++++ .../CorrelationIdProvider_Tests.cs | 34 +++++++++++++ 6 files changed, 125 insertions(+), 13 deletions(-) create mode 100644 framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProviderController.cs create mode 100644 framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProvider_Tests.cs create mode 100644 framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs diff --git a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs b/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs index 02e5f91596..4d3419c871 100644 --- a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs +++ b/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs @@ -7,7 +7,7 @@ using Volo.Abp.Tracing; namespace Volo.Abp.AspNetCore.Tracing; [Dependency(ReplaceServices = true)] -public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider, ITransientDependency +public class AspNetCoreCorrelationIdProvider : DefaultCorrelationIdProvider, ITransientDependency { protected IHttpContextAccessor HttpContextAccessor { get; } protected AbpCorrelationIdOptions Options { get; } @@ -20,11 +20,11 @@ public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider, ITransien Options = options.Value; } - public virtual string Get() + protected override string GetDefaultCorrelationId() { if (HttpContextAccessor.HttpContext?.Request?.Headers == null) { - return CreateNewCorrelationId(); + return base.GetDefaultCorrelationId(); } string correlationId = HttpContextAccessor.HttpContext.Request.Headers[Options.HttpHeaderName]; @@ -35,7 +35,7 @@ public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider, ITransien { if (correlationId.IsNullOrEmpty()) { - correlationId = CreateNewCorrelationId(); + correlationId = base.GetDefaultCorrelationId();; HttpContextAccessor.HttpContext.Request.Headers[Options.HttpHeaderName] = correlationId; } } @@ -43,9 +43,4 @@ public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider, ITransien return correlationId; } - - protected virtual string CreateNewCorrelationId() - { - return Guid.NewGuid().ToString("N"); - } } diff --git a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs index e24e81ca21..d5166325c0 100644 --- a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs +++ b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs @@ -1,16 +1,31 @@ using System; +using System.Threading; using Volo.Abp.DependencyInjection; namespace Volo.Abp.Tracing; public class DefaultCorrelationIdProvider : ICorrelationIdProvider, ISingletonDependency { - public string Get() + private readonly AsyncLocal _currentCorrelationId = new AsyncLocal(); + + private string CorrelationId => _currentCorrelationId.Value ?? GetDefaultCorrelationId(); + + public virtual string Get() + { + return CorrelationId; + } + + public virtual IDisposable Change(string correlationId) { - return CreateNewCorrelationId(); + var parent = CorrelationId; + _currentCorrelationId.Value = correlationId; + return new DisposeAction(() => + { + _currentCorrelationId.Value = parent; + }); } - protected virtual string CreateNewCorrelationId() + protected virtual string GetDefaultCorrelationId() { return Guid.NewGuid().ToString("N"); } diff --git a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs index dd94ccb93d..82657a93ff 100644 --- a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs +++ b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs @@ -1,4 +1,5 @@ -using JetBrains.Annotations; +using System; +using JetBrains.Annotations; namespace Volo.Abp.Tracing; @@ -6,4 +7,6 @@ public interface ICorrelationIdProvider { [NotNull] string Get(); + + IDisposable Change(string correlationId); } diff --git a/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProviderController.cs b/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProviderController.cs new file mode 100644 index 0000000000..185d206dc6 --- /dev/null +++ b/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProviderController.cs @@ -0,0 +1,17 @@ +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.AspNetCore.Mvc; +using Volo.Abp.Tracing; + +namespace Volo.Abp.AspNetCore.CorrelationIdProvider; + +[Route("api/correlation")] +public class CorrelationIdProviderController : AbpController +{ + [HttpGet] + [Route("get")] + public string Get() + { + return this.HttpContext.RequestServices.GetRequiredService().Get(); + } +} diff --git a/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProvider_Tests.cs b/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProvider_Tests.cs new file mode 100644 index 0000000000..0db761f57d --- /dev/null +++ b/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProvider_Tests.cs @@ -0,0 +1,48 @@ +using System; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Threading.Tasks; +using Shouldly; +using Volo.Abp.AspNetCore.Mvc; +using Xunit; + +namespace Volo.Abp.AspNetCore.CorrelationIdProvider; + +public class CorrelationIdProvider_Tests : AspNetCoreMvcTestBase +{ + [Fact] + public async Task Test() + { + // Test AbpCorrelationIdMiddleware without X-Correlation-Id header + using (var requestMessage = new HttpRequestMessage(HttpMethod.Get, "/api/correlation/404")) + { + var response = await Client.SendAsync(requestMessage); + response.StatusCode.ShouldBe(HttpStatusCode.NotFound); + + response.Headers.ShouldContain(x => x.Key == "X-Correlation-Id" && x.Value.First() != null); + } + + var correlationId = Guid.NewGuid().ToString("N"); + + // Test AbpCorrelationIdMiddleware + using (var requestMessage = new HttpRequestMessage(HttpMethod.Get, "/api/correlation/404")) + { + requestMessage.Headers.Add("X-Correlation-Id", correlationId); + var response = await Client.SendAsync(requestMessage); + response.StatusCode.ShouldBe(HttpStatusCode.NotFound); + + response.Headers.ShouldContain(x => x.Key == "X-Correlation-Id" && x.Value.First() == correlationId); + } + + // Test AspNetCoreCorrelationIdProvider + using (var requestMessage = new HttpRequestMessage(HttpMethod.Get, "/api/correlation/get")) + { + requestMessage.Headers.Add("X-Correlation-Id", correlationId); + var response = await Client.SendAsync(requestMessage); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + + (await response.Content.ReadAsStringAsync()).ShouldBe(correlationId); + } + } +} diff --git a/framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs b/framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs new file mode 100644 index 0000000000..53012a851a --- /dev/null +++ b/framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs @@ -0,0 +1,34 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Shouldly; +using Volo.Abp.Modularity; +using Volo.Abp.Tracing; +using Xunit; + +namespace Volo.Abp.CorrelationIdProvider; + +public class CorrelationIdProvider_Tests +{ + [Fact] + public async Task Test() + { + using (var application = await AbpApplicationFactory.CreateAsync()) + { + await application.InitializeAsync(); + + var correlationIdProvider = application.ServiceProvider.GetRequiredService(); + var currentCorrelationId = correlationIdProvider.Get(); + currentCorrelationId.ShouldNotBeNull(); + + var correlationId = Guid.NewGuid().ToString("N"); + using (correlationIdProvider.Change(correlationId)) + { + correlationIdProvider.Get().ShouldBe(correlationId); + } + + //The default correlation id always changes. + correlationIdProvider.Get().ShouldNotBe(currentCorrelationId); + } + } +} From 860c6d9c08da3fad4c2f5f239d54c922d9adc39b Mon Sep 17 00:00:00 2001 From: maliming Date: Mon, 12 Jun 2023 11:27:55 +0800 Subject: [PATCH 2/9] Change current `correlation` when handling events. --- .../AbpAspNetCoreMvcDaprEventsController.cs | 3 +- .../Tracing/AbpCorrelationIdMiddleware.cs | 34 +++++-- .../AspNetCoreCorrelationIdProvider.cs | 46 ---------- .../Tracing/DefaultCorrelationIdProvider.cs | 13 +-- .../Abp/Tracing/ICorrelationIdProvider.cs | 6 +- .../DistributedEvents/IncomingEventRecord.cs | 9 +- .../DistributedEvents/OutgoingEventRecord.cs | 9 +- .../EventBus/Distributed/IncomingEventInfo.cs | 11 +++ .../EventBus/Distributed/OutgoingEventInfo.cs | 13 ++- .../Volo/Abp/EventBus/EventBusConsts.cs | 6 ++ .../Azure/AzureDistributedEventBus.cs | 55 ++++++++---- .../EventBus/Dapr/DaprDistributedEventBus.cs | 72 +++++++++------ .../Kafka/KafkaDistributedEventBus.cs | 82 ++++++++++++----- .../Abp/EventBus/Kafka/MessageExtensions.cs | 12 +++ .../RabbitMq/RabbitMqDistributedEventBus.cs | 88 ++++++++++++------- .../Rebus/RebusDistributedEventBus.cs | 62 +++++++++---- .../Distributed/DistributedEventBusBase.cs | 55 +++++++----- .../DistributedEvents/IncomingEventRecord.cs | 9 +- .../DistributedEvents/OutgoingEventRecord.cs | 9 +- 19 files changed, 382 insertions(+), 212 deletions(-) delete mode 100644 framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs create mode 100644 framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventBusConsts.cs diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs index da0dad6a70..0a5b5c82af 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs @@ -33,7 +33,8 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController var distributedEventBus = HttpContext.RequestServices.GetRequiredService(); var eventData = daprSerializer.Deserialize(data, distributedEventBus.GetEventType(topic)); - await distributedEventBus.TriggerHandlersAsync(id, distributedEventBus.GetEventType(topic), eventData); + string correlationId = null; + await distributedEventBus.TriggerHandlersAsync(id, distributedEventBus.GetEventType(topic), eventData, correlationId); return Ok(); } } diff --git a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs b/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs index 23f1192d1c..2d0740589b 100644 --- a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs +++ b/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs @@ -1,4 +1,5 @@ -using Microsoft.AspNetCore.Http; +using System; +using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Options; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; @@ -20,16 +21,37 @@ public class AbpCorrelationIdMiddleware : IMiddleware, ITransientDependency public async Task InvokeAsync(HttpContext context, RequestDelegate next) { - var correlationId = _correlationIdProvider.Get(); + var correlationId = GetCorrelationIdFromRequest(context); - try + using (_correlationIdProvider.Change(correlationId)) { - await next(context); + try + { + await next(context); + } + finally + { + CheckAndSetCorrelationIdOnResponse(context, _options, correlationId); + } } - finally + } + + protected virtual string GetCorrelationIdFromRequest(HttpContext context) + { + string correlationId = context.Request.Headers[_options.HttpHeaderName]; + if (correlationId.IsNullOrEmpty()) { - CheckAndSetCorrelationIdOnResponse(context, _options, correlationId); + lock (context.Request.Headers) + { + if (correlationId.IsNullOrEmpty()) + { + correlationId = Guid.NewGuid().ToString("N"); + context.Request.Headers[_options.HttpHeaderName] = correlationId; + } + } } + + return correlationId; } protected virtual void CheckAndSetCorrelationIdOnResponse( diff --git a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs b/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs deleted file mode 100644 index 4d3419c871..0000000000 --- a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs +++ /dev/null @@ -1,46 +0,0 @@ -using System; -using Microsoft.AspNetCore.Http; -using Microsoft.Extensions.Options; -using Volo.Abp.DependencyInjection; -using Volo.Abp.Tracing; - -namespace Volo.Abp.AspNetCore.Tracing; - -[Dependency(ReplaceServices = true)] -public class AspNetCoreCorrelationIdProvider : DefaultCorrelationIdProvider, ITransientDependency -{ - protected IHttpContextAccessor HttpContextAccessor { get; } - protected AbpCorrelationIdOptions Options { get; } - - public AspNetCoreCorrelationIdProvider( - IHttpContextAccessor httpContextAccessor, - IOptions options) - { - HttpContextAccessor = httpContextAccessor; - Options = options.Value; - } - - protected override string GetDefaultCorrelationId() - { - if (HttpContextAccessor.HttpContext?.Request?.Headers == null) - { - return base.GetDefaultCorrelationId(); - } - - string correlationId = HttpContextAccessor.HttpContext.Request.Headers[Options.HttpHeaderName]; - - if (correlationId.IsNullOrEmpty()) - { - lock (HttpContextAccessor.HttpContext.Request.Headers) - { - if (correlationId.IsNullOrEmpty()) - { - correlationId = base.GetDefaultCorrelationId();; - HttpContextAccessor.HttpContext.Request.Headers[Options.HttpHeaderName] = correlationId; - } - } - } - - return correlationId; - } -} diff --git a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs index d5166325c0..578587f449 100644 --- a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs +++ b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs @@ -6,16 +6,16 @@ namespace Volo.Abp.Tracing; public class DefaultCorrelationIdProvider : ICorrelationIdProvider, ISingletonDependency { - private readonly AsyncLocal _currentCorrelationId = new AsyncLocal(); + private readonly AsyncLocal _currentCorrelationId = new AsyncLocal(); - private string CorrelationId => _currentCorrelationId.Value ?? GetDefaultCorrelationId(); + private string? CorrelationId => _currentCorrelationId.Value; - public virtual string Get() + public virtual string? Get() { return CorrelationId; } - public virtual IDisposable Change(string correlationId) + public virtual IDisposable Change(string? correlationId) { var parent = CorrelationId; _currentCorrelationId.Value = correlationId; @@ -24,9 +24,4 @@ public class DefaultCorrelationIdProvider : ICorrelationIdProvider, ISingletonDe _currentCorrelationId.Value = parent; }); } - - protected virtual string GetDefaultCorrelationId() - { - return Guid.NewGuid().ToString("N"); - } } diff --git a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs index 82657a93ff..06c3200877 100644 --- a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs +++ b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs @@ -1,12 +1,10 @@ using System; -using JetBrains.Annotations; namespace Volo.Abp.Tracing; public interface ICorrelationIdProvider { - [NotNull] - string Get(); + string? Get(); - IDisposable Change(string correlationId); + IDisposable Change(string? correlationId); } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs index c891f3a406..986c19bbbf 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs @@ -48,13 +48,20 @@ public class IncomingEventRecord : public IncomingEventInfo ToIncomingEventInfo() { - return new IncomingEventInfo( + var info = new IncomingEventInfo( Id, MessageId, EventName, EventData, CreationTime ); + + foreach (var property in ExtraProperties) + { + info.SetProperty(property.Key, property.Value); + } + + return info; } public void MarkAsProcessed(DateTime processedTime) diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs index 41c4d41ce4..fe639411a2 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs @@ -41,11 +41,18 @@ public class OutgoingEventRecord : public OutgoingEventInfo ToOutgoingEventInfo() { - return new OutgoingEventInfo( + var info = new OutgoingEventInfo( Id, EventName, EventData, CreationTime ); + + foreach (var property in ExtraProperties) + { + info.SetProperty(property.Key, property.Value); + } + + return info; } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs index d28b28e4d6..372e4e3d77 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using Volo.Abp.Data; namespace Volo.Abp.EventBus.Distributed; @@ -40,4 +41,14 @@ public class IncomingEventInfo : IHasExtraProperties ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } + + public void SetCorrelationId(string correlationId) + { + ExtraProperties[EventBusConsts.CorrelationIdHeaderName] = correlationId; + } + + public string GetCorrelationId() + { + return ExtraProperties.GetOrDefault(EventBusConsts.CorrelationIdHeaderName)?.ToString(); + } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs index 359b33f3b3..299935741e 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using Volo.Abp.Data; namespace Volo.Abp.EventBus.Distributed; @@ -36,4 +37,14 @@ public class OutgoingEventInfo : IHasExtraProperties ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } -} \ No newline at end of file + + public void SetCorrelationId(string correlationId) + { + ExtraProperties[EventBusConsts.CorrelationIdHeaderName] = correlationId; + } + + public string GetCorrelationId() + { + return ExtraProperties.GetOrDefault(EventBusConsts.CorrelationIdHeaderName)?.ToString(); + } +} diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventBusConsts.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventBusConsts.cs new file mode 100644 index 0000000000..f1fa920670 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventBusConsts.cs @@ -0,0 +1,6 @@ +namespace Volo.Abp.EventBus; + +public static class EventBusConsts +{ + public const string CorrelationIdHeaderName = "X-Correlation-Id"; +} diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index 8279a412a5..d7ca68bee6 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; +using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; @@ -14,6 +15,7 @@ using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Azure; @@ -42,7 +44,8 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen IAzureServiceBusMessageConsumerFactory messageConsumerFactory, IPublisherPool publisherPool, IEventHandlerInvoker eventHandlerInvoker, - ILocalEventBus localEventBus) + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) : base(serviceScopeFactory, currentTenant, unitOfWorkManager, @@ -50,7 +53,8 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen guidGenerator, clock, eventHandlerInvoker, - localEventBus) + localEventBus, + correlationIdProvider) { Options = abpAzureEventBusOptions.Value; Serializer = serializer; @@ -86,24 +90,30 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventData = Serializer.Deserialize(message.Body.ToArray(), eventType); - if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData)) + if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData, message.CorrelationId)) { return; } - await TriggerHandlersDirectAsync(eventType, eventData); + using (CorrelationIdProvider.Change(message.CorrelationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } } public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } - await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.Id); + await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.GetCorrelationId(), outgoingEvent.Id); } public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) @@ -125,18 +135,23 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen message.MessageId = outgoingEvent.Id.ToString(); } + message.CorrelationId = outgoingEvent.GetCorrelationId(); + if (!messageBatch.TryAddMessage(message)) { throw new AbpException( "The message is too large to fit in the batch. Set AbpEventBusBoxesOptions.OutboxWaitingEventMaxCount to reduce the number"); } - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } } await publisher.SendMessagesAsync(messageBatch); @@ -152,7 +167,10 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); - await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); @@ -244,12 +262,13 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen { var body = Serializer.Serialize(eventData); - return PublishAsync(eventName, body, null); + return PublishAsync(eventName, body, CorrelationIdProvider.Get(), null); } protected virtual async Task PublishAsync( string eventName, byte[] body, + [CanBeNull] string correlationId, Guid? eventId) { var message = new ServiceBusMessage(body) @@ -262,6 +281,8 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen message.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N"); } + message.CorrelationId = correlationId; + var publisher = await PublisherPool.GetAsync( Options.TopicName, Options.ConnectionName); diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index 23cf8aa1e9..29a2c7c695 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -13,6 +13,7 @@ using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Dapr; @@ -39,8 +40,17 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend IDaprSerializer serializer, IOptions daprEventBusOptions, IAbpDaprClientFactory daprClientFactory, - ILocalEventBus localEventBus) - : base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus) + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) + : base(serviceScopeFactory, + currentTenant, + unitOfWorkManager, + abpDistributedEventBusOptions, + guidGenerator, + clock, + eventHandlerInvoker, + localEventBus, + correlationIdProvider) { Serializer = serializer; DaprEventBusOptions = daprEventBusOptions.Value; @@ -119,9 +129,9 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - protected override async Task PublishToEventBusAsync(Type eventType, object eventData) + protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - await PublishToDaprAsync(eventType, eventData); + await PublishToDaprAsync(eventType, eventData, CorrelationIdProvider.Get()); } protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) @@ -141,43 +151,52 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return handlerFactoryList.ToArray(); } - public override async Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) + public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } - await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName))); + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.GetCorrelationId()); } - public override async Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) + public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) { var outgoingEventArray = outgoingEvents.ToArray(); foreach (var outgoingEvent in outgoingEventArray) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } - await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName))); + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.GetCorrelationId()); } } - public virtual async Task TriggerHandlersAsync(string messageId, Type eventType, object eventData) + public virtual async Task TriggerHandlersAsync(string messageId, Type eventType, object eventData, string correlationId) { - if (await AddToInboxAsync(messageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData)) + if (await AddToInboxAsync(messageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, correlationId)) { return; } - await TriggerHandlersDirectAsync(eventType, eventData); + using (CorrelationIdProvider.Change(correlationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } } public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) @@ -190,7 +209,10 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); - await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); @@ -226,12 +248,12 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return EventTypes.GetOrDefault(eventName); } - protected virtual async Task PublishToDaprAsync(Type eventType, object eventData) + protected virtual async Task PublishToDaprAsync(Type eventType, object eventData, string correlationId) { - await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData); + await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData, correlationId); } - protected virtual async Task PublishToDaprAsync(string eventName, object eventData) + protected virtual async Task PublishToDaprAsync(string eventName, object eventData, string correlationId) { var client = DaprClientFactory.Create(); await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: eventData); diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 62e161fad1..b49ffc07a9 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -14,6 +14,7 @@ using Volo.Abp.Kafka; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Kafka; @@ -42,7 +43,8 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, - ILocalEventBus localEventBus) + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) : base( serviceScopeFactory, currentTenant, @@ -51,7 +53,8 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen guidGenerator, clock, eventHandlerInvoker, - localEventBus) + localEventBus, + correlationIdProvider) { AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; MessageConsumerFactory = messageConsumerFactory; @@ -84,13 +87,17 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen var messageId = message.GetMessageId(); var eventData = Serializer.Deserialize(message.Value, eventType); + var correlationId = message.GetCorrelationId(); - if (await AddToInboxAsync(messageId, eventName, eventType, eventData)) + if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId)) { return; } - await TriggerHandlersDirectAsync(eventType, eventData); + using (CorrelationIdProvider.Change(correlationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) @@ -163,14 +170,21 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { + var headers = new Headers + { + { "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) } + }; + + if (CorrelationIdProvider.Get() != null) + { + headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(CorrelationIdProvider.Get()!)); + } + await PublishAsync( AbpKafkaEventBusOptions.TopicName, eventType, eventData, - new Headers - { - { "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) } - } + headers ); } @@ -179,25 +193,34 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - public override async Task PublishFromOutboxAsync( + public async override Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } + + var headers = new Headers + { + { "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) } + }; + if (outgoingEvent.GetCorrelationId() != null) + { + headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); + } await PublishAsync( AbpKafkaEventBusOptions.TopicName, outgoingEvent.EventName, outgoingEvent.EventData, - new Headers - { - { "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) } - } + headers ); } @@ -214,12 +237,20 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen { "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)} }; - await TriggerDistributedEventSentAsync(new DistributedEventSent() + if (outgoingEvent.GetCorrelationId() != null) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); + } + + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) + { + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } producer.Produce( AbpKafkaEventBusOptions.TopicName, @@ -244,7 +275,10 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); - await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs index 17a80ec87c..569e56a1dc 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs @@ -15,4 +15,16 @@ public static class MessageExtensions return messageId; } + + public static string GetCorrelationId(this Message message) + { + string correlationId = null; + + if (message.Headers.TryGetLastBytes(EventBusConsts.CorrelationIdHeaderName, out var correlationIdBytes)) + { + correlationId = System.Text.Encoding.UTF8.GetString(correlationIdBytes); + } + + return correlationId; + } } diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index 8d4756165a..a2459ea8d4 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -3,6 +3,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using RabbitMQ.Client; @@ -15,6 +16,7 @@ using Volo.Abp.MultiTenancy; using Volo.Abp.RabbitMQ; using Volo.Abp.Threading; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.RabbitMq; @@ -49,7 +51,8 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, - ILocalEventBus localEventBus) + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) : base( serviceScopeFactory, currentTenant, @@ -58,7 +61,8 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe guidGenerator, clock, eventHandlerInvoker, - localEventBus) + localEventBus, + correlationIdProvider) { ConnectionPool = connectionPool; Serializer = serializer; @@ -103,12 +107,16 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe var eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); - if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData)) + var correlationId = ea.BasicProperties.CorrelationId; + if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData, correlationId)) { return; } - await TriggerHandlersDirectAsync(eventType, eventData); + using (CorrelationIdProvider.Change(correlationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) @@ -186,7 +194,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - await PublishAsync(eventType, eventData, null); + await PublishAsync(eventType, eventData, correlationId: CorrelationIdProvider.Get()); } protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) @@ -194,18 +202,21 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - public override async Task PublishFromOutboxAsync( + public async override Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } - await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, null, eventId: outgoingEvent.Id); + await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId()); } public async override Task PublishManyFromOutboxAsync( @@ -219,19 +230,22 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe foreach (var outgoingEvent in outgoingEventArray) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } await PublishAsync( channel, outgoingEvent.EventName, outgoingEvent.EventData, - properties: null, - eventId: outgoingEvent.Id); + eventId: outgoingEvent.Id, + correlationId: outgoingEvent.GetCorrelationId()); } channel.WaitForConfirmsOrDie(); @@ -250,7 +264,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); - await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); @@ -262,28 +279,29 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe return Serializer.Serialize(eventData); } - public Task PublishAsync( + public virtual Task PublishAsync( Type eventType, object eventData, - IBasicProperties properties, - Dictionary headersArguments = null) + Dictionary headersArguments = null, + Guid? eventId = null, + [CanBeNull] string correlationId = null) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); - return PublishAsync(eventName, body, properties, headersArguments); + return PublishAsync( eventName, body, headersArguments, eventId, correlationId); } protected virtual Task PublishAsync( string eventName, byte[] body, - IBasicProperties properties, Dictionary headersArguments = null, - Guid? eventId = null) + Guid? eventId = null, + [CanBeNull] string correlationId = null) { using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel()) { - return PublishAsync(channel, eventName, body, properties, headersArguments, eventId); + return PublishAsync(channel, eventName, body, headersArguments, eventId, correlationId); } } @@ -291,23 +309,25 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe IModel channel, string eventName, byte[] body, - IBasicProperties properties, Dictionary headersArguments = null, - Guid? eventId = null) + Guid? eventId = null, + [CanBeNull] string correlationId = null) { EnsureExchangeExists(channel); - if (properties == null) - { - properties = channel.CreateBasicProperties(); - properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; - } + var properties = channel.CreateBasicProperties(); + properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; if (properties.MessageId.IsNullOrEmpty()) { properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N"); } + if (correlationId != null) + { + properties.CorrelationId = correlationId; + } + SetEventMessageHeaders(properties, headersArguments); channel.BasicPublish( diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 0983e98836..560b13ba14 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -16,6 +16,7 @@ using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Rebus; @@ -43,7 +44,8 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, - ILocalEventBus localEventBus) : + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) : base( serviceScopeFactory, currentTenant, @@ -52,7 +54,8 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen guidGenerator, clock, eventHandlerInvoker, - localEventBus) + localEventBus, + correlationIdProvider) { Rebus = rebus; Serializer = serializer; @@ -144,18 +147,27 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen { var messageId = MessageContext.Current.TransportMessage.GetMessageId(); var eventName = EventNameAttribute.GetNameOrDefault(eventType); + var correlationId = MessageContext.Current.Headers.GetOrDefault(EventBusConsts.CorrelationIdHeaderName); - if (await AddToInboxAsync(messageId, eventName, eventType, eventData)) + if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId)) { return; } - await TriggerHandlersDirectAsync(eventType, eventData); + using (CorrelationIdProvider.Change(correlationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - await PublishAsync(eventType, eventData); + var headers = new Dictionary(); + if (CorrelationIdProvider.Get() != null) + { + headers.Add(EventBusConsts.CorrelationIdHeaderName, CorrelationIdProvider.Get()); + } + await PublishAsync(eventType, eventData, headersArguments: headers); } protected virtual async Task PublishAsync( @@ -234,21 +246,29 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen return false; } - public override async Task PublishFromOutboxAsync( + public async override Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } - await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id); + var headers = new Dictionary(); + if (outgoingEvent.GetCorrelationId() != null) + { + headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId()); + } + + await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id, headersArguments: headers); } public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) @@ -259,12 +279,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen { foreach (var outgoingEvent in outgoingEventArray) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } await PublishFromOutboxAsync(outgoingEvent, outboxConfig); } @@ -285,7 +308,10 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); - await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index 89423ce111..2ca5c85bb0 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -2,12 +2,14 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Volo.Abp.EventBus.Local; using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Distributed; @@ -18,6 +20,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB protected IClock Clock { get; } protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } protected ILocalEventBus LocalEventBus { get; } + protected ICorrelationIdProvider CorrelationIdProvider { get; } protected DistributedEventBusBase( IServiceScopeFactory serviceScopeFactory, @@ -27,7 +30,8 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, - ILocalEventBus localEventBus) : base( + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, @@ -37,6 +41,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB Clock = clock; AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; LocalEventBus = localEventBus; + CorrelationIdProvider = correlationIdProvider; } public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class @@ -129,14 +134,14 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB EventData = eventData }); - await eventOutbox.EnqueueAsync( - new OutgoingEventInfo( - GuidGenerator.Create(), - eventName, - Serialize(eventData), - Clock.Now - ) + var outgoingEventInfo = new OutgoingEventInfo( + GuidGenerator.Create(), + eventName, + Serialize(eventData), + Clock.Now ); + outgoingEventInfo.SetCorrelationId(CorrelationIdProvider.Get()); + await eventOutbox.EnqueueAsync(outgoingEventInfo); return true; } } @@ -153,7 +158,8 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB string messageId, string eventName, Type eventType, - object eventData) + object eventData, + [CanBeNull] string correlationId) { if (AbpDistributedEventBusOptions.Inboxes.Count <= 0) { @@ -177,22 +183,25 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB } } - await TriggerDistributedEventReceivedAsync(new DistributedEventReceived + using (CorrelationIdProvider.Change(correlationId)) { - Source = DistributedEventSource.Direct, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData - }); - - await eventInbox.EnqueueAsync( - new IncomingEventInfo( - GuidGenerator.Create(), - messageId, - eventName, - Serialize(eventData), - Clock.Now - ) + await TriggerDistributedEventReceivedAsync(new DistributedEventReceived + { + Source = DistributedEventSource.Direct, + EventName = EventNameAttribute.GetNameOrDefault(eventType), + EventData = eventData + }); + } + + var incomingEventInfo = new IncomingEventInfo( + GuidGenerator.Create(), + messageId, + eventName, + Serialize(eventData), + Clock.Now ); + incomingEventInfo.SetCorrelationId(correlationId); + await eventInbox.EnqueueAsync(incomingEventInfo); } } } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs index 22c1d860d1..e6ddc87bd6 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs @@ -48,13 +48,20 @@ public class IncomingEventRecord : public IncomingEventInfo ToIncomingEventInfo() { - return new IncomingEventInfo( + var info = new IncomingEventInfo( Id, MessageId, EventName, EventData, CreationTime ); + + foreach (var property in ExtraProperties) + { + info.SetProperty(property.Key, property.Value); + } + + return info; } public void MarkAsProcessed(DateTime processedTime) diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/OutgoingEventRecord.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/OutgoingEventRecord.cs index 15dfac38f6..0f0798532b 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/OutgoingEventRecord.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/OutgoingEventRecord.cs @@ -41,11 +41,18 @@ public class OutgoingEventRecord : public OutgoingEventInfo ToOutgoingEventInfo() { - return new OutgoingEventInfo( + var info = new OutgoingEventInfo( Id, EventName, EventData, CreationTime ); + + foreach (var property in ExtraProperties) + { + info.SetProperty(property.Key, property.Value); + } + + return info; } } From 26f20deb0cc7605c9f421fdb3130e335dd5c5795 Mon Sep 17 00:00:00 2001 From: maliming Date: Mon, 12 Jun 2023 13:45:32 +0800 Subject: [PATCH 3/9] Use `AbpDaprEventData ` to pass `CorrelationId`. --- .../AbpAspNetCoreMvcDaprEventsController.cs | 17 +++++++++--- .../Abp/EventBus/Dapr/AbpDaprEventData.cs | 27 +++++++++++++++++++ .../EventBus/Dapr/DaprDistributedEventBus.cs | 3 ++- 3 files changed, 43 insertions(+), 4 deletions(-) create mode 100644 framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs index 0a5b5c82af..60d9602cc1 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Text.Json; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; @@ -32,9 +33,19 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController } var distributedEventBus = HttpContext.RequestServices.GetRequiredService(); - var eventData = daprSerializer.Deserialize(data, distributedEventBus.GetEventType(topic)); - string correlationId = null; - await distributedEventBus.TriggerHandlersAsync(id, distributedEventBus.GetEventType(topic), eventData, correlationId); + + if (data.Contains("Data") && data.Contains("CorrelationId")) //TODO: Check the json with JSON Schema. + { + var abpDaprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData<>).MakeGenericType(distributedEventBus.GetEventType(topic))); + var eventData = abpDaprEventData.GetType().GetProperties().First(x => x.Name == "Data").GetValue(abpDaprEventData); + var correlationId = abpDaprEventData.GetType().GetProperties().First(x => x.Name == "CorrelationId").GetValue(abpDaprEventData) as string; + await distributedEventBus.TriggerHandlersAsync(id, distributedEventBus.GetEventType(topic), eventData, correlationId); + } + else + { + var eventData = daprSerializer.Deserialize(data, distributedEventBus.GetEventType(topic)); + await distributedEventBus.TriggerHandlersAsync(id, distributedEventBus.GetEventType(topic), eventData, null); + } return Ok(); } } diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs new file mode 100644 index 0000000000..25e6bc1b0b --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs @@ -0,0 +1,27 @@ +using System; +using System.Reflection; + +namespace Volo.Abp.EventBus.Dapr; + +public class AbpDaprEventData +{ + public TData Data { get; set; } + + public string CorrelationId { get; set; } + + public AbpDaprEventData(TData data, string correlationId) + { + Data = data; + CorrelationId = correlationId; + } + + public static object Create(object data, string correlationId) + { + return Activator.CreateInstance( + typeof(AbpDaprEventData<>).MakeGenericType(data.GetType()), + BindingFlags.Instance | BindingFlags.Public, + binder: null, + new object[] { data, correlationId }, + culture: null)!; + } +} diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index 29a2c7c695..b7b6efcb92 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -256,7 +256,8 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend protected virtual async Task PublishToDaprAsync(string eventName, object eventData, string correlationId) { var client = DaprClientFactory.Create(); - await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: eventData); + var data = AbpDaprEventData.Create(eventData, correlationId); + await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: data); } private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) From 9d1e941d39d7a948e5e9e2484df4c0ff9e0697d3 Mon Sep 17 00:00:00 2001 From: maliming Date: Mon, 12 Jun 2023 16:10:38 +0800 Subject: [PATCH 4/9] Add `IsAbpDaprEventData` to check the data. --- .../AbpAspNetCoreMvcDaprEventsController.cs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs index 60d9602cc1..2907037db2 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs @@ -34,7 +34,7 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController var distributedEventBus = HttpContext.RequestServices.GetRequiredService(); - if (data.Contains("Data") && data.Contains("CorrelationId")) //TODO: Check the json with JSON Schema. + if (IsAbpDaprEventData(data)) { var abpDaprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData<>).MakeGenericType(distributedEventBus.GetEventType(topic))); var eventData = abpDaprEventData.GetType().GetProperties().First(x => x.Name == "Data").GetValue(abpDaprEventData); @@ -48,4 +48,13 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController } return Ok(); } + + protected virtual bool IsAbpDaprEventData(string data) + { + var document = JsonDocument.Parse(data); + var objects = document.RootElement.EnumerateObject().ToList(); + return objects.Count == 2 && + objects.Any(x => x.Name.Equals("correlationId", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("data", StringComparison.CurrentCultureIgnoreCase)); + } } From 7035bd2516ab527b894cc06d3ec9327c2c7a5bdc Mon Sep 17 00:00:00 2001 From: maliming Date: Mon, 12 Jun 2023 16:17:05 +0800 Subject: [PATCH 5/9] Update AbpAspNetCoreMvcDaprEventsController.cs --- .../Controllers/AbpAspNetCoreMvcDaprEventsController.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs index 2907037db2..706bb8a56c 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs @@ -54,7 +54,7 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController var document = JsonDocument.Parse(data); var objects = document.RootElement.EnumerateObject().ToList(); return objects.Count == 2 && - objects.Any(x => x.Name.Equals("correlationId", StringComparison.CurrentCultureIgnoreCase)) && - objects.Any(x => x.Name.Equals("data", StringComparison.CurrentCultureIgnoreCase)); + objects.Any(x => x.Name.Equals("data", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("correlationId", StringComparison.CurrentCultureIgnoreCase)); } } From 4ea198656521d07cb853dd5ee29399739278e9dd Mon Sep 17 00:00:00 2001 From: maliming Date: Mon, 12 Jun 2023 16:51:04 +0800 Subject: [PATCH 6/9] Update CorrelationIdProvider_Tests.cs --- .../CorrelationIdProvider/CorrelationIdProvider_Tests.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs b/framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs index 53012a851a..997038afa1 100644 --- a/framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs +++ b/framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs @@ -18,8 +18,8 @@ public class CorrelationIdProvider_Tests await application.InitializeAsync(); var correlationIdProvider = application.ServiceProvider.GetRequiredService(); - var currentCorrelationId = correlationIdProvider.Get(); - currentCorrelationId.ShouldNotBeNull(); + + correlationIdProvider.Get().ShouldBeNull(); var correlationId = Guid.NewGuid().ToString("N"); using (correlationIdProvider.Change(correlationId)) @@ -27,8 +27,7 @@ public class CorrelationIdProvider_Tests correlationIdProvider.Get().ShouldBe(correlationId); } - //The default correlation id always changes. - correlationIdProvider.Get().ShouldNotBe(currentCorrelationId); + correlationIdProvider.Get().ShouldBeNull(); } } } From 75e3383008a6337a4b2ca0a6179b8548fd7df71f Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 13 Jun 2023 13:48:15 +0800 Subject: [PATCH 7/9] Add more properties to `AbpDaprEventData`. --- .../AbpAspNetCoreMvcDaprEventsController.cs | 20 +++++++------ .../Volo/Abp/Dapr/IDaprSerializer.cs | 2 ++ .../Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs | 5 ++++ .../Abp/EventBus/Dapr/AbpDaprEventData.cs | 30 ++++++++----------- .../EventBus/Dapr/DaprDistributedEventBus.cs | 16 +++++----- 5 files changed, 39 insertions(+), 34 deletions(-) diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs index 706bb8a56c..456a118d4e 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs @@ -22,7 +22,6 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController var daprSerializer = HttpContext.RequestServices.GetRequiredService(); var body = (await JsonDocument.ParseAsync(HttpContext.Request.Body)); - var id = body.RootElement.GetProperty("id").GetString(); var pubSubName = body.RootElement.GetProperty("pubsubname").GetString(); var topic = body.RootElement.GetProperty("topic").GetString(); var data = body.RootElement.GetProperty("data").GetRawText(); @@ -36,16 +35,16 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController if (IsAbpDaprEventData(data)) { - var abpDaprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData<>).MakeGenericType(distributedEventBus.GetEventType(topic))); - var eventData = abpDaprEventData.GetType().GetProperties().First(x => x.Name == "Data").GetValue(abpDaprEventData); - var correlationId = abpDaprEventData.GetType().GetProperties().First(x => x.Name == "CorrelationId").GetValue(abpDaprEventData) as string; - await distributedEventBus.TriggerHandlersAsync(id, distributedEventBus.GetEventType(topic), eventData, correlationId); + var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As(); + var eventData = daprSerializer.Deserialize(daprEventData.JsonData, distributedEventBus.GetEventType(daprEventData.Topic)); + await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(daprEventData.Topic), eventData, daprEventData.MessageId, daprEventData.CorrelationId); } else { var eventData = daprSerializer.Deserialize(data, distributedEventBus.GetEventType(topic)); - await distributedEventBus.TriggerHandlersAsync(id, distributedEventBus.GetEventType(topic), eventData, null); + await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(topic), eventData); } + return Ok(); } @@ -53,8 +52,11 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController { var document = JsonDocument.Parse(data); var objects = document.RootElement.EnumerateObject().ToList(); - return objects.Count == 2 && - objects.Any(x => x.Name.Equals("data", StringComparison.CurrentCultureIgnoreCase)) && - objects.Any(x => x.Name.Equals("correlationId", StringComparison.CurrentCultureIgnoreCase)); + return objects.Count == 5 && + objects.Any(x => x.Name.Equals("PubSubName", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("Topic", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("MessageId", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("jsonData", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("CorrelationId", StringComparison.CurrentCultureIgnoreCase)); } } diff --git a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs index 9a8b4c9520..1d1861cd97 100644 --- a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs +++ b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs @@ -6,6 +6,8 @@ public interface IDaprSerializer { byte[] Serialize(object obj); + string SerializeToString(object obj); + object Deserialize(byte[] value, Type type); object Deserialize(string value, Type type); diff --git a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs index c0924f775b..a1a8324598 100644 --- a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs +++ b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs @@ -19,6 +19,11 @@ public class Utf8JsonDaprSerializer : IDaprSerializer, ITransientDependency return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj)); } + public string SerializeToString(object obj) + { + return _jsonSerializer.Serialize(obj); + } + public object Deserialize(byte[] value, Type type) { return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value)); diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs index 25e6bc1b0b..ee08586b8d 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs @@ -1,27 +1,23 @@ -using System; -using System.Reflection; - namespace Volo.Abp.EventBus.Dapr; -public class AbpDaprEventData +public class AbpDaprEventData { - public TData Data { get; set; } + public string PubSubName { get; set; } + + public string Topic { get; set; } + + public string MessageId { get; set; } + + public string JsonData { get; set; } public string CorrelationId { get; set; } - public AbpDaprEventData(TData data, string correlationId) + public AbpDaprEventData(string pubSubName, string topic, string messageId, string jsonData, string correlationId) { - Data = data; + PubSubName = pubSubName; + Topic = topic; + MessageId = messageId; + JsonData = jsonData; CorrelationId = correlationId; } - - public static object Create(object data, string correlationId) - { - return Activator.CreateInstance( - typeof(AbpDaprEventData<>).MakeGenericType(data.GetType()), - BindingFlags.Instance | BindingFlags.Public, - binder: null, - new object[] { data, correlationId }, - culture: null)!; - } } diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index b7b6efcb92..4912a58388 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -131,7 +131,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - await PublishToDaprAsync(eventType, eventData, CorrelationIdProvider.Get()); + await PublishToDaprAsync(eventType, eventData, null, CorrelationIdProvider.Get()); } protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) @@ -163,7 +163,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend }); } - await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.GetCorrelationId()); + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId()); } public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) @@ -182,11 +182,11 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend }); } - await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.GetCorrelationId()); + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId()); } } - public virtual async Task TriggerHandlersAsync(string messageId, Type eventType, object eventData, string correlationId) + public virtual async Task TriggerHandlersAsync(Type eventType, object eventData, string messageId = null, string correlationId = null) { if (await AddToInboxAsync(messageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, correlationId)) { @@ -248,15 +248,15 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return EventTypes.GetOrDefault(eventName); } - protected virtual async Task PublishToDaprAsync(Type eventType, object eventData, string correlationId) + protected virtual async Task PublishToDaprAsync(Type eventType, object eventData, Guid? messageId = null, string correlationId = null) { - await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData, correlationId); + await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData, messageId, correlationId); } - protected virtual async Task PublishToDaprAsync(string eventName, object eventData, string correlationId) + protected virtual async Task PublishToDaprAsync(string eventName, object eventData, Guid? messageId = null, string correlationId = null) { var client = DaprClientFactory.Create(); - var data = AbpDaprEventData.Create(eventData, correlationId); + var data = new AbpDaprEventData(DaprEventBusOptions.PubSubName, eventName, (messageId ?? GuidGenerator.Create()).ToString("N"), Serializer.SerializeToString(eventData), correlationId); await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: data); } From 88be3f231b9f527eb4ab447c4f39ea782e2ea230 Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 13 Jun 2023 13:52:56 +0800 Subject: [PATCH 8/9] Update AbpAspNetCoreMvcDaprEventsController.cs --- .../Controllers/AbpAspNetCoreMvcDaprEventsController.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs index 456a118d4e..b1d5bac7fb 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs @@ -56,7 +56,7 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController objects.Any(x => x.Name.Equals("PubSubName", StringComparison.CurrentCultureIgnoreCase)) && objects.Any(x => x.Name.Equals("Topic", StringComparison.CurrentCultureIgnoreCase)) && objects.Any(x => x.Name.Equals("MessageId", StringComparison.CurrentCultureIgnoreCase)) && - objects.Any(x => x.Name.Equals("jsonData", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("JsonData", StringComparison.CurrentCultureIgnoreCase)) && objects.Any(x => x.Name.Equals("CorrelationId", StringComparison.CurrentCultureIgnoreCase)); } } From d161e2579b6ba0ba0f31f4c10d034b71949b52f3 Mon Sep 17 00:00:00 2001 From: maliming Date: Mon, 19 Jun 2023 14:59:14 +0800 Subject: [PATCH 9/9] Remove `lock` code. --- .../AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs b/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs index 2d0740589b..33f5de3582 100644 --- a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs +++ b/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs @@ -41,14 +41,8 @@ public class AbpCorrelationIdMiddleware : IMiddleware, ITransientDependency string correlationId = context.Request.Headers[_options.HttpHeaderName]; if (correlationId.IsNullOrEmpty()) { - lock (context.Request.Headers) - { - if (correlationId.IsNullOrEmpty()) - { - correlationId = Guid.NewGuid().ToString("N"); - context.Request.Headers[_options.HttpHeaderName] = correlationId; - } - } + correlationId = Guid.NewGuid().ToString("N"); + context.Request.Headers[_options.HttpHeaderName] = correlationId; } return correlationId;