diff --git a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs index da0dad6a70..b1d5bac7fb 100644 --- a/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs +++ b/framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Text.Json; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; @@ -21,7 +22,6 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController var daprSerializer = HttpContext.RequestServices.GetRequiredService(); var body = (await JsonDocument.ParseAsync(HttpContext.Request.Body)); - var id = body.RootElement.GetProperty("id").GetString(); var pubSubName = body.RootElement.GetProperty("pubsubname").GetString(); var topic = body.RootElement.GetProperty("topic").GetString(); var data = body.RootElement.GetProperty("data").GetRawText(); @@ -32,8 +32,31 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController } var distributedEventBus = HttpContext.RequestServices.GetRequiredService(); - var eventData = daprSerializer.Deserialize(data, distributedEventBus.GetEventType(topic)); - await distributedEventBus.TriggerHandlersAsync(id, distributedEventBus.GetEventType(topic), eventData); + + if (IsAbpDaprEventData(data)) + { + var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As(); + var eventData = daprSerializer.Deserialize(daprEventData.JsonData, distributedEventBus.GetEventType(daprEventData.Topic)); + await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(daprEventData.Topic), eventData, daprEventData.MessageId, daprEventData.CorrelationId); + } + else + { + var eventData = daprSerializer.Deserialize(data, distributedEventBus.GetEventType(topic)); + await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(topic), eventData); + } + return Ok(); } + + protected virtual bool IsAbpDaprEventData(string data) + { + var document = JsonDocument.Parse(data); + var objects = document.RootElement.EnumerateObject().ToList(); + return objects.Count == 5 && + objects.Any(x => x.Name.Equals("PubSubName", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("Topic", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("MessageId", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("JsonData", StringComparison.CurrentCultureIgnoreCase)) && + objects.Any(x => x.Name.Equals("CorrelationId", StringComparison.CurrentCultureIgnoreCase)); + } } diff --git a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs b/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs index 23f1192d1c..33f5de3582 100644 --- a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs +++ b/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs @@ -1,4 +1,5 @@ -using Microsoft.AspNetCore.Http; +using System; +using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Options; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; @@ -20,16 +21,31 @@ public class AbpCorrelationIdMiddleware : IMiddleware, ITransientDependency public async Task InvokeAsync(HttpContext context, RequestDelegate next) { - var correlationId = _correlationIdProvider.Get(); + var correlationId = GetCorrelationIdFromRequest(context); - try + using (_correlationIdProvider.Change(correlationId)) { - await next(context); + try + { + await next(context); + } + finally + { + CheckAndSetCorrelationIdOnResponse(context, _options, correlationId); + } } - finally + } + + protected virtual string GetCorrelationIdFromRequest(HttpContext context) + { + string correlationId = context.Request.Headers[_options.HttpHeaderName]; + if (correlationId.IsNullOrEmpty()) { - CheckAndSetCorrelationIdOnResponse(context, _options, correlationId); + correlationId = Guid.NewGuid().ToString("N"); + context.Request.Headers[_options.HttpHeaderName] = correlationId; } + + return correlationId; } protected virtual void CheckAndSetCorrelationIdOnResponse( diff --git a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs b/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs deleted file mode 100644 index 02e5f91596..0000000000 --- a/framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs +++ /dev/null @@ -1,51 +0,0 @@ -using System; -using Microsoft.AspNetCore.Http; -using Microsoft.Extensions.Options; -using Volo.Abp.DependencyInjection; -using Volo.Abp.Tracing; - -namespace Volo.Abp.AspNetCore.Tracing; - -[Dependency(ReplaceServices = true)] -public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider, ITransientDependency -{ - protected IHttpContextAccessor HttpContextAccessor { get; } - protected AbpCorrelationIdOptions Options { get; } - - public AspNetCoreCorrelationIdProvider( - IHttpContextAccessor httpContextAccessor, - IOptions options) - { - HttpContextAccessor = httpContextAccessor; - Options = options.Value; - } - - public virtual string Get() - { - if (HttpContextAccessor.HttpContext?.Request?.Headers == null) - { - return CreateNewCorrelationId(); - } - - string correlationId = HttpContextAccessor.HttpContext.Request.Headers[Options.HttpHeaderName]; - - if (correlationId.IsNullOrEmpty()) - { - lock (HttpContextAccessor.HttpContext.Request.Headers) - { - if (correlationId.IsNullOrEmpty()) - { - correlationId = CreateNewCorrelationId(); - HttpContextAccessor.HttpContext.Request.Headers[Options.HttpHeaderName] = correlationId; - } - } - } - - return correlationId; - } - - protected virtual string CreateNewCorrelationId() - { - return Guid.NewGuid().ToString("N"); - } -} diff --git a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs index e24e81ca21..578587f449 100644 --- a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs +++ b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs @@ -1,17 +1,27 @@ using System; +using System.Threading; using Volo.Abp.DependencyInjection; namespace Volo.Abp.Tracing; public class DefaultCorrelationIdProvider : ICorrelationIdProvider, ISingletonDependency { - public string Get() + private readonly AsyncLocal _currentCorrelationId = new AsyncLocal(); + + private string? CorrelationId => _currentCorrelationId.Value; + + public virtual string? Get() { - return CreateNewCorrelationId(); + return CorrelationId; } - protected virtual string CreateNewCorrelationId() + public virtual IDisposable Change(string? correlationId) { - return Guid.NewGuid().ToString("N"); + var parent = CorrelationId; + _currentCorrelationId.Value = correlationId; + return new DisposeAction(() => + { + _currentCorrelationId.Value = parent; + }); } } diff --git a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs index dd94ccb93d..06c3200877 100644 --- a/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs +++ b/framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs @@ -1,9 +1,10 @@ -using JetBrains.Annotations; +using System; namespace Volo.Abp.Tracing; public interface ICorrelationIdProvider { - [NotNull] - string Get(); + string? Get(); + + IDisposable Change(string? correlationId); } diff --git a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs index 9a8b4c9520..1d1861cd97 100644 --- a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs +++ b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs @@ -6,6 +6,8 @@ public interface IDaprSerializer { byte[] Serialize(object obj); + string SerializeToString(object obj); + object Deserialize(byte[] value, Type type); object Deserialize(string value, Type type); diff --git a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs index c0924f775b..a1a8324598 100644 --- a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs +++ b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs @@ -19,6 +19,11 @@ public class Utf8JsonDaprSerializer : IDaprSerializer, ITransientDependency return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj)); } + public string SerializeToString(object obj) + { + return _jsonSerializer.Serialize(obj); + } + public object Deserialize(byte[] value, Type type) { return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value)); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs index c891f3a406..986c19bbbf 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs @@ -48,13 +48,20 @@ public class IncomingEventRecord : public IncomingEventInfo ToIncomingEventInfo() { - return new IncomingEventInfo( + var info = new IncomingEventInfo( Id, MessageId, EventName, EventData, CreationTime ); + + foreach (var property in ExtraProperties) + { + info.SetProperty(property.Key, property.Value); + } + + return info; } public void MarkAsProcessed(DateTime processedTime) diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs index 41c4d41ce4..fe639411a2 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs @@ -41,11 +41,18 @@ public class OutgoingEventRecord : public OutgoingEventInfo ToOutgoingEventInfo() { - return new OutgoingEventInfo( + var info = new OutgoingEventInfo( Id, EventName, EventData, CreationTime ); + + foreach (var property in ExtraProperties) + { + info.SetProperty(property.Key, property.Value); + } + + return info; } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs index d28b28e4d6..372e4e3d77 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using Volo.Abp.Data; namespace Volo.Abp.EventBus.Distributed; @@ -40,4 +41,14 @@ public class IncomingEventInfo : IHasExtraProperties ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } + + public void SetCorrelationId(string correlationId) + { + ExtraProperties[EventBusConsts.CorrelationIdHeaderName] = correlationId; + } + + public string GetCorrelationId() + { + return ExtraProperties.GetOrDefault(EventBusConsts.CorrelationIdHeaderName)?.ToString(); + } } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs index 359b33f3b3..299935741e 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using Volo.Abp.Data; namespace Volo.Abp.EventBus.Distributed; @@ -36,4 +37,14 @@ public class OutgoingEventInfo : IHasExtraProperties ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } -} \ No newline at end of file + + public void SetCorrelationId(string correlationId) + { + ExtraProperties[EventBusConsts.CorrelationIdHeaderName] = correlationId; + } + + public string GetCorrelationId() + { + return ExtraProperties.GetOrDefault(EventBusConsts.CorrelationIdHeaderName)?.ToString(); + } +} diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventBusConsts.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventBusConsts.cs new file mode 100644 index 0000000000..f1fa920670 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventBusConsts.cs @@ -0,0 +1,6 @@ +namespace Volo.Abp.EventBus; + +public static class EventBusConsts +{ + public const string CorrelationIdHeaderName = "X-Correlation-Id"; +} diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index 8279a412a5..d7ca68bee6 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; +using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; @@ -14,6 +15,7 @@ using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Azure; @@ -42,7 +44,8 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen IAzureServiceBusMessageConsumerFactory messageConsumerFactory, IPublisherPool publisherPool, IEventHandlerInvoker eventHandlerInvoker, - ILocalEventBus localEventBus) + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) : base(serviceScopeFactory, currentTenant, unitOfWorkManager, @@ -50,7 +53,8 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen guidGenerator, clock, eventHandlerInvoker, - localEventBus) + localEventBus, + correlationIdProvider) { Options = abpAzureEventBusOptions.Value; Serializer = serializer; @@ -86,24 +90,30 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventData = Serializer.Deserialize(message.Body.ToArray(), eventType); - if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData)) + if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData, message.CorrelationId)) { return; } - await TriggerHandlersDirectAsync(eventType, eventData); + using (CorrelationIdProvider.Change(message.CorrelationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } } public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } - await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.Id); + await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.GetCorrelationId(), outgoingEvent.Id); } public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) @@ -125,18 +135,23 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen message.MessageId = outgoingEvent.Id.ToString(); } + message.CorrelationId = outgoingEvent.GetCorrelationId(); + if (!messageBatch.TryAddMessage(message)) { throw new AbpException( "The message is too large to fit in the batch. Set AbpEventBusBoxesOptions.OutboxWaitingEventMaxCount to reduce the number"); } - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } } await publisher.SendMessagesAsync(messageBatch); @@ -152,7 +167,10 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); - await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); @@ -244,12 +262,13 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen { var body = Serializer.Serialize(eventData); - return PublishAsync(eventName, body, null); + return PublishAsync(eventName, body, CorrelationIdProvider.Get(), null); } protected virtual async Task PublishAsync( string eventName, byte[] body, + [CanBeNull] string correlationId, Guid? eventId) { var message = new ServiceBusMessage(body) @@ -262,6 +281,8 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen message.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N"); } + message.CorrelationId = correlationId; + var publisher = await PublisherPool.GetAsync( Options.TopicName, Options.ConnectionName); diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs new file mode 100644 index 0000000000..ee08586b8d --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs @@ -0,0 +1,23 @@ +namespace Volo.Abp.EventBus.Dapr; + +public class AbpDaprEventData +{ + public string PubSubName { get; set; } + + public string Topic { get; set; } + + public string MessageId { get; set; } + + public string JsonData { get; set; } + + public string CorrelationId { get; set; } + + public AbpDaprEventData(string pubSubName, string topic, string messageId, string jsonData, string correlationId) + { + PubSubName = pubSubName; + Topic = topic; + MessageId = messageId; + JsonData = jsonData; + CorrelationId = correlationId; + } +} diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index 23cf8aa1e9..4912a58388 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -13,6 +13,7 @@ using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Dapr; @@ -39,8 +40,17 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend IDaprSerializer serializer, IOptions daprEventBusOptions, IAbpDaprClientFactory daprClientFactory, - ILocalEventBus localEventBus) - : base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus) + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) + : base(serviceScopeFactory, + currentTenant, + unitOfWorkManager, + abpDistributedEventBusOptions, + guidGenerator, + clock, + eventHandlerInvoker, + localEventBus, + correlationIdProvider) { Serializer = serializer; DaprEventBusOptions = daprEventBusOptions.Value; @@ -119,9 +129,9 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - protected override async Task PublishToEventBusAsync(Type eventType, object eventData) + protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - await PublishToDaprAsync(eventType, eventData); + await PublishToDaprAsync(eventType, eventData, null, CorrelationIdProvider.Get()); } protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) @@ -141,43 +151,52 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return handlerFactoryList.ToArray(); } - public override async Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) + public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } - await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName))); + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId()); } - public override async Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) + public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) { var outgoingEventArray = outgoingEvents.ToArray(); foreach (var outgoingEvent in outgoingEventArray) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } - await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName))); + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId()); } } - public virtual async Task TriggerHandlersAsync(string messageId, Type eventType, object eventData) + public virtual async Task TriggerHandlersAsync(Type eventType, object eventData, string messageId = null, string correlationId = null) { - if (await AddToInboxAsync(messageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData)) + if (await AddToInboxAsync(messageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, correlationId)) { return; } - await TriggerHandlersDirectAsync(eventType, eventData); + using (CorrelationIdProvider.Change(correlationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } } public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) @@ -190,7 +209,10 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); - await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); @@ -226,15 +248,16 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return EventTypes.GetOrDefault(eventName); } - protected virtual async Task PublishToDaprAsync(Type eventType, object eventData) + protected virtual async Task PublishToDaprAsync(Type eventType, object eventData, Guid? messageId = null, string correlationId = null) { - await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData); + await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData, messageId, correlationId); } - protected virtual async Task PublishToDaprAsync(string eventName, object eventData) + protected virtual async Task PublishToDaprAsync(string eventName, object eventData, Guid? messageId = null, string correlationId = null) { var client = DaprClientFactory.Create(); - await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: eventData); + var data = new AbpDaprEventData(DaprEventBusOptions.PubSubName, eventName, (messageId ?? GuidGenerator.Create()).ToString("N"), Serializer.SerializeToString(eventData), correlationId); + await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: data); } private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 62e161fad1..b49ffc07a9 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -14,6 +14,7 @@ using Volo.Abp.Kafka; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Kafka; @@ -42,7 +43,8 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, - ILocalEventBus localEventBus) + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) : base( serviceScopeFactory, currentTenant, @@ -51,7 +53,8 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen guidGenerator, clock, eventHandlerInvoker, - localEventBus) + localEventBus, + correlationIdProvider) { AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; MessageConsumerFactory = messageConsumerFactory; @@ -84,13 +87,17 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen var messageId = message.GetMessageId(); var eventData = Serializer.Deserialize(message.Value, eventType); + var correlationId = message.GetCorrelationId(); - if (await AddToInboxAsync(messageId, eventName, eventType, eventData)) + if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId)) { return; } - await TriggerHandlersDirectAsync(eventType, eventData); + using (CorrelationIdProvider.Change(correlationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) @@ -163,14 +170,21 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { + var headers = new Headers + { + { "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) } + }; + + if (CorrelationIdProvider.Get() != null) + { + headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(CorrelationIdProvider.Get()!)); + } + await PublishAsync( AbpKafkaEventBusOptions.TopicName, eventType, eventData, - new Headers - { - { "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) } - } + headers ); } @@ -179,25 +193,34 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - public override async Task PublishFromOutboxAsync( + public async override Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } + + var headers = new Headers + { + { "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) } + }; + if (outgoingEvent.GetCorrelationId() != null) + { + headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); + } await PublishAsync( AbpKafkaEventBusOptions.TopicName, outgoingEvent.EventName, outgoingEvent.EventData, - new Headers - { - { "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) } - } + headers ); } @@ -214,12 +237,20 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen { "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)} }; - await TriggerDistributedEventSentAsync(new DistributedEventSent() + if (outgoingEvent.GetCorrelationId() != null) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); + } + + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) + { + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } producer.Produce( AbpKafkaEventBusOptions.TopicName, @@ -244,7 +275,10 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); - await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs index 17a80ec87c..569e56a1dc 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs @@ -15,4 +15,16 @@ public static class MessageExtensions return messageId; } + + public static string GetCorrelationId(this Message message) + { + string correlationId = null; + + if (message.Headers.TryGetLastBytes(EventBusConsts.CorrelationIdHeaderName, out var correlationIdBytes)) + { + correlationId = System.Text.Encoding.UTF8.GetString(correlationIdBytes); + } + + return correlationId; + } } diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index 8d4756165a..a2459ea8d4 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -3,6 +3,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using RabbitMQ.Client; @@ -15,6 +16,7 @@ using Volo.Abp.MultiTenancy; using Volo.Abp.RabbitMQ; using Volo.Abp.Threading; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.RabbitMq; @@ -49,7 +51,8 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, - ILocalEventBus localEventBus) + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) : base( serviceScopeFactory, currentTenant, @@ -58,7 +61,8 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe guidGenerator, clock, eventHandlerInvoker, - localEventBus) + localEventBus, + correlationIdProvider) { ConnectionPool = connectionPool; Serializer = serializer; @@ -103,12 +107,16 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe var eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); - if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData)) + var correlationId = ea.BasicProperties.CorrelationId; + if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData, correlationId)) { return; } - await TriggerHandlersDirectAsync(eventType, eventData); + using (CorrelationIdProvider.Change(correlationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) @@ -186,7 +194,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - await PublishAsync(eventType, eventData, null); + await PublishAsync(eventType, eventData, correlationId: CorrelationIdProvider.Get()); } protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) @@ -194,18 +202,21 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - public override async Task PublishFromOutboxAsync( + public async override Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } - await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, null, eventId: outgoingEvent.Id); + await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId()); } public async override Task PublishManyFromOutboxAsync( @@ -219,19 +230,22 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe foreach (var outgoingEvent in outgoingEventArray) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } await PublishAsync( channel, outgoingEvent.EventName, outgoingEvent.EventData, - properties: null, - eventId: outgoingEvent.Id); + eventId: outgoingEvent.Id, + correlationId: outgoingEvent.GetCorrelationId()); } channel.WaitForConfirmsOrDie(); @@ -250,7 +264,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); - await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); @@ -262,28 +279,29 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe return Serializer.Serialize(eventData); } - public Task PublishAsync( + public virtual Task PublishAsync( Type eventType, object eventData, - IBasicProperties properties, - Dictionary headersArguments = null) + Dictionary headersArguments = null, + Guid? eventId = null, + [CanBeNull] string correlationId = null) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); - return PublishAsync(eventName, body, properties, headersArguments); + return PublishAsync( eventName, body, headersArguments, eventId, correlationId); } protected virtual Task PublishAsync( string eventName, byte[] body, - IBasicProperties properties, Dictionary headersArguments = null, - Guid? eventId = null) + Guid? eventId = null, + [CanBeNull] string correlationId = null) { using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel()) { - return PublishAsync(channel, eventName, body, properties, headersArguments, eventId); + return PublishAsync(channel, eventName, body, headersArguments, eventId, correlationId); } } @@ -291,23 +309,25 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe IModel channel, string eventName, byte[] body, - IBasicProperties properties, Dictionary headersArguments = null, - Guid? eventId = null) + Guid? eventId = null, + [CanBeNull] string correlationId = null) { EnsureExchangeExists(channel); - if (properties == null) - { - properties = channel.CreateBasicProperties(); - properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; - } + var properties = channel.CreateBasicProperties(); + properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; if (properties.MessageId.IsNullOrEmpty()) { properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N"); } + if (correlationId != null) + { + properties.CorrelationId = correlationId; + } + SetEventMessageHeaders(properties, headersArguments); channel.BasicPublish( diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 0983e98836..560b13ba14 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -16,6 +16,7 @@ using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Rebus; @@ -43,7 +44,8 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, - ILocalEventBus localEventBus) : + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) : base( serviceScopeFactory, currentTenant, @@ -52,7 +54,8 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen guidGenerator, clock, eventHandlerInvoker, - localEventBus) + localEventBus, + correlationIdProvider) { Rebus = rebus; Serializer = serializer; @@ -144,18 +147,27 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen { var messageId = MessageContext.Current.TransportMessage.GetMessageId(); var eventName = EventNameAttribute.GetNameOrDefault(eventType); + var correlationId = MessageContext.Current.Headers.GetOrDefault(EventBusConsts.CorrelationIdHeaderName); - if (await AddToInboxAsync(messageId, eventName, eventType, eventData)) + if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId)) { return; } - await TriggerHandlersDirectAsync(eventType, eventData); + using (CorrelationIdProvider.Change(correlationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - await PublishAsync(eventType, eventData); + var headers = new Dictionary(); + if (CorrelationIdProvider.Get() != null) + { + headers.Add(EventBusConsts.CorrelationIdHeaderName, CorrelationIdProvider.Get()); + } + await PublishAsync(eventType, eventData, headersArguments: headers); } protected virtual async Task PublishAsync( @@ -234,21 +246,29 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen return false; } - public override async Task PublishFromOutboxAsync( + public async override Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } - await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id); + var headers = new Dictionary(); + if (outgoingEvent.GetCorrelationId() != null) + { + headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId()); + } + + await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id, headersArguments: headers); } public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) @@ -259,12 +279,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen { foreach (var outgoingEvent in outgoingEventArray) { - await TriggerDistributedEventSentAsync(new DistributedEventSent() + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { - Source = DistributedEventSource.Outbox, - EventName = outgoingEvent.EventName, - EventData = outgoingEvent.EventData - }); + await TriggerDistributedEventSentAsync(new DistributedEventSent() + { + Source = DistributedEventSource.Outbox, + EventName = outgoingEvent.EventName, + EventData = outgoingEvent.EventData + }); + } await PublishFromOutboxAsync(outgoingEvent, outboxConfig); } @@ -285,7 +308,10 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); - await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) + { + await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig); + } if (exceptions.Any()) { ThrowOriginalExceptions(eventType, exceptions); diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index 89423ce111..2ca5c85bb0 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -2,12 +2,14 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Volo.Abp.EventBus.Local; using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Timing; +using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Distributed; @@ -18,6 +20,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB protected IClock Clock { get; } protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } protected ILocalEventBus LocalEventBus { get; } + protected ICorrelationIdProvider CorrelationIdProvider { get; } protected DistributedEventBusBase( IServiceScopeFactory serviceScopeFactory, @@ -27,7 +30,8 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, - ILocalEventBus localEventBus) : base( + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, @@ -37,6 +41,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB Clock = clock; AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; LocalEventBus = localEventBus; + CorrelationIdProvider = correlationIdProvider; } public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class @@ -129,14 +134,14 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB EventData = eventData }); - await eventOutbox.EnqueueAsync( - new OutgoingEventInfo( - GuidGenerator.Create(), - eventName, - Serialize(eventData), - Clock.Now - ) + var outgoingEventInfo = new OutgoingEventInfo( + GuidGenerator.Create(), + eventName, + Serialize(eventData), + Clock.Now ); + outgoingEventInfo.SetCorrelationId(CorrelationIdProvider.Get()); + await eventOutbox.EnqueueAsync(outgoingEventInfo); return true; } } @@ -153,7 +158,8 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB string messageId, string eventName, Type eventType, - object eventData) + object eventData, + [CanBeNull] string correlationId) { if (AbpDistributedEventBusOptions.Inboxes.Count <= 0) { @@ -177,22 +183,25 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB } } - await TriggerDistributedEventReceivedAsync(new DistributedEventReceived + using (CorrelationIdProvider.Change(correlationId)) { - Source = DistributedEventSource.Direct, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData - }); - - await eventInbox.EnqueueAsync( - new IncomingEventInfo( - GuidGenerator.Create(), - messageId, - eventName, - Serialize(eventData), - Clock.Now - ) + await TriggerDistributedEventReceivedAsync(new DistributedEventReceived + { + Source = DistributedEventSource.Direct, + EventName = EventNameAttribute.GetNameOrDefault(eventType), + EventData = eventData + }); + } + + var incomingEventInfo = new IncomingEventInfo( + GuidGenerator.Create(), + messageId, + eventName, + Serialize(eventData), + Clock.Now ); + incomingEventInfo.SetCorrelationId(correlationId); + await eventInbox.EnqueueAsync(incomingEventInfo); } } } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs index 22c1d860d1..e6ddc87bd6 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs @@ -48,13 +48,20 @@ public class IncomingEventRecord : public IncomingEventInfo ToIncomingEventInfo() { - return new IncomingEventInfo( + var info = new IncomingEventInfo( Id, MessageId, EventName, EventData, CreationTime ); + + foreach (var property in ExtraProperties) + { + info.SetProperty(property.Key, property.Value); + } + + return info; } public void MarkAsProcessed(DateTime processedTime) diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/OutgoingEventRecord.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/OutgoingEventRecord.cs index 15dfac38f6..0f0798532b 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/OutgoingEventRecord.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/OutgoingEventRecord.cs @@ -41,11 +41,18 @@ public class OutgoingEventRecord : public OutgoingEventInfo ToOutgoingEventInfo() { - return new OutgoingEventInfo( + var info = new OutgoingEventInfo( Id, EventName, EventData, CreationTime ); + + foreach (var property in ExtraProperties) + { + info.SetProperty(property.Key, property.Value); + } + + return info; } } diff --git a/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProviderController.cs b/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProviderController.cs new file mode 100644 index 0000000000..185d206dc6 --- /dev/null +++ b/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProviderController.cs @@ -0,0 +1,17 @@ +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.AspNetCore.Mvc; +using Volo.Abp.Tracing; + +namespace Volo.Abp.AspNetCore.CorrelationIdProvider; + +[Route("api/correlation")] +public class CorrelationIdProviderController : AbpController +{ + [HttpGet] + [Route("get")] + public string Get() + { + return this.HttpContext.RequestServices.GetRequiredService().Get(); + } +} diff --git a/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProvider_Tests.cs b/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProvider_Tests.cs new file mode 100644 index 0000000000..0db761f57d --- /dev/null +++ b/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProvider_Tests.cs @@ -0,0 +1,48 @@ +using System; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Threading.Tasks; +using Shouldly; +using Volo.Abp.AspNetCore.Mvc; +using Xunit; + +namespace Volo.Abp.AspNetCore.CorrelationIdProvider; + +public class CorrelationIdProvider_Tests : AspNetCoreMvcTestBase +{ + [Fact] + public async Task Test() + { + // Test AbpCorrelationIdMiddleware without X-Correlation-Id header + using (var requestMessage = new HttpRequestMessage(HttpMethod.Get, "/api/correlation/404")) + { + var response = await Client.SendAsync(requestMessage); + response.StatusCode.ShouldBe(HttpStatusCode.NotFound); + + response.Headers.ShouldContain(x => x.Key == "X-Correlation-Id" && x.Value.First() != null); + } + + var correlationId = Guid.NewGuid().ToString("N"); + + // Test AbpCorrelationIdMiddleware + using (var requestMessage = new HttpRequestMessage(HttpMethod.Get, "/api/correlation/404")) + { + requestMessage.Headers.Add("X-Correlation-Id", correlationId); + var response = await Client.SendAsync(requestMessage); + response.StatusCode.ShouldBe(HttpStatusCode.NotFound); + + response.Headers.ShouldContain(x => x.Key == "X-Correlation-Id" && x.Value.First() == correlationId); + } + + // Test AspNetCoreCorrelationIdProvider + using (var requestMessage = new HttpRequestMessage(HttpMethod.Get, "/api/correlation/get")) + { + requestMessage.Headers.Add("X-Correlation-Id", correlationId); + var response = await Client.SendAsync(requestMessage); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + + (await response.Content.ReadAsStringAsync()).ShouldBe(correlationId); + } + } +} diff --git a/framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs b/framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs new file mode 100644 index 0000000000..997038afa1 --- /dev/null +++ b/framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs @@ -0,0 +1,33 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Shouldly; +using Volo.Abp.Modularity; +using Volo.Abp.Tracing; +using Xunit; + +namespace Volo.Abp.CorrelationIdProvider; + +public class CorrelationIdProvider_Tests +{ + [Fact] + public async Task Test() + { + using (var application = await AbpApplicationFactory.CreateAsync()) + { + await application.InitializeAsync(); + + var correlationIdProvider = application.ServiceProvider.GetRequiredService(); + + correlationIdProvider.Get().ShouldBeNull(); + + var correlationId = Guid.NewGuid().ToString("N"); + using (correlationIdProvider.Change(correlationId)) + { + correlationIdProvider.Get().ShouldBe(correlationId); + } + + correlationIdProvider.Get().ShouldBeNull(); + } + } +}