Browse Source

Merge pull request #16795 from abpframework/add-CorrelationId-to-distributed-events

Add `CorrelationId` to distributed events.
pull/16885/head
Halil İbrahim Kalkan 3 years ago
committed by GitHub
parent
commit
8795097e4d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs
  2. 28
      framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs
  3. 51
      framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs
  4. 18
      framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs
  5. 7
      framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs
  6. 2
      framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs
  7. 5
      framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs
  8. 9
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs
  9. 9
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs
  10. 11
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs
  11. 13
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs
  12. 6
      framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventBusConsts.cs
  13. 55
      framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs
  14. 23
      framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs
  15. 75
      framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs
  16. 82
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
  17. 12
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs
  18. 88
      framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs
  19. 62
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs
  20. 55
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs
  21. 9
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs
  22. 9
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/OutgoingEventRecord.cs
  23. 17
      framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProviderController.cs
  24. 48
      framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProvider_Tests.cs
  25. 33
      framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs

29
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs

@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
@ -21,7 +22,6 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController
var daprSerializer = HttpContext.RequestServices.GetRequiredService<IDaprSerializer>();
var body = (await JsonDocument.ParseAsync(HttpContext.Request.Body));
var id = body.RootElement.GetProperty("id").GetString();
var pubSubName = body.RootElement.GetProperty("pubsubname").GetString();
var topic = body.RootElement.GetProperty("topic").GetString();
var data = body.RootElement.GetProperty("data").GetRawText();
@ -32,8 +32,31 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController
}
var distributedEventBus = HttpContext.RequestServices.GetRequiredService<DaprDistributedEventBus>();
var eventData = daprSerializer.Deserialize(data, distributedEventBus.GetEventType(topic));
await distributedEventBus.TriggerHandlersAsync(id, distributedEventBus.GetEventType(topic), eventData);
if (IsAbpDaprEventData(data))
{
var daprEventData = daprSerializer.Deserialize(data, typeof(AbpDaprEventData)).As<AbpDaprEventData>();
var eventData = daprSerializer.Deserialize(daprEventData.JsonData, distributedEventBus.GetEventType(daprEventData.Topic));
await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(daprEventData.Topic), eventData, daprEventData.MessageId, daprEventData.CorrelationId);
}
else
{
var eventData = daprSerializer.Deserialize(data, distributedEventBus.GetEventType(topic));
await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(topic), eventData);
}
return Ok();
}
protected virtual bool IsAbpDaprEventData(string data)
{
var document = JsonDocument.Parse(data);
var objects = document.RootElement.EnumerateObject().ToList();
return objects.Count == 5 &&
objects.Any(x => x.Name.Equals("PubSubName", StringComparison.CurrentCultureIgnoreCase)) &&
objects.Any(x => x.Name.Equals("Topic", StringComparison.CurrentCultureIgnoreCase)) &&
objects.Any(x => x.Name.Equals("MessageId", StringComparison.CurrentCultureIgnoreCase)) &&
objects.Any(x => x.Name.Equals("JsonData", StringComparison.CurrentCultureIgnoreCase)) &&
objects.Any(x => x.Name.Equals("CorrelationId", StringComparison.CurrentCultureIgnoreCase));
}
}

28
framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AbpCorrelationIdMiddleware.cs

@ -1,4 +1,5 @@
using Microsoft.AspNetCore.Http;
using System;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Options;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
@ -20,16 +21,31 @@ public class AbpCorrelationIdMiddleware : IMiddleware, ITransientDependency
public async Task InvokeAsync(HttpContext context, RequestDelegate next)
{
var correlationId = _correlationIdProvider.Get();
var correlationId = GetCorrelationIdFromRequest(context);
try
using (_correlationIdProvider.Change(correlationId))
{
await next(context);
try
{
await next(context);
}
finally
{
CheckAndSetCorrelationIdOnResponse(context, _options, correlationId);
}
}
finally
}
protected virtual string GetCorrelationIdFromRequest(HttpContext context)
{
string correlationId = context.Request.Headers[_options.HttpHeaderName];
if (correlationId.IsNullOrEmpty())
{
CheckAndSetCorrelationIdOnResponse(context, _options, correlationId);
correlationId = Guid.NewGuid().ToString("N");
context.Request.Headers[_options.HttpHeaderName] = correlationId;
}
return correlationId;
}
protected virtual void CheckAndSetCorrelationIdOnResponse(

51
framework/src/Volo.Abp.AspNetCore/Volo/Abp/AspNetCore/Tracing/AspNetCoreCorrelationIdProvider.cs

@ -1,51 +0,0 @@
using System;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Tracing;
namespace Volo.Abp.AspNetCore.Tracing;
[Dependency(ReplaceServices = true)]
public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider, ITransientDependency
{
protected IHttpContextAccessor HttpContextAccessor { get; }
protected AbpCorrelationIdOptions Options { get; }
public AspNetCoreCorrelationIdProvider(
IHttpContextAccessor httpContextAccessor,
IOptions<AbpCorrelationIdOptions> options)
{
HttpContextAccessor = httpContextAccessor;
Options = options.Value;
}
public virtual string Get()
{
if (HttpContextAccessor.HttpContext?.Request?.Headers == null)
{
return CreateNewCorrelationId();
}
string correlationId = HttpContextAccessor.HttpContext.Request.Headers[Options.HttpHeaderName];
if (correlationId.IsNullOrEmpty())
{
lock (HttpContextAccessor.HttpContext.Request.Headers)
{
if (correlationId.IsNullOrEmpty())
{
correlationId = CreateNewCorrelationId();
HttpContextAccessor.HttpContext.Request.Headers[Options.HttpHeaderName] = correlationId;
}
}
}
return correlationId;
}
protected virtual string CreateNewCorrelationId()
{
return Guid.NewGuid().ToString("N");
}
}

18
framework/src/Volo.Abp.Core/Volo/Abp/Tracing/DefaultCorrelationIdProvider.cs

@ -1,17 +1,27 @@
using System;
using System.Threading;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.Tracing;
public class DefaultCorrelationIdProvider : ICorrelationIdProvider, ISingletonDependency
{
public string Get()
private readonly AsyncLocal<string?> _currentCorrelationId = new AsyncLocal<string?>();
private string? CorrelationId => _currentCorrelationId.Value;
public virtual string? Get()
{
return CreateNewCorrelationId();
return CorrelationId;
}
protected virtual string CreateNewCorrelationId()
public virtual IDisposable Change(string? correlationId)
{
return Guid.NewGuid().ToString("N");
var parent = CorrelationId;
_currentCorrelationId.Value = correlationId;
return new DisposeAction(() =>
{
_currentCorrelationId.Value = parent;
});
}
}

7
framework/src/Volo.Abp.Core/Volo/Abp/Tracing/ICorrelationIdProvider.cs

@ -1,9 +1,10 @@
using JetBrains.Annotations;
using System;
namespace Volo.Abp.Tracing;
public interface ICorrelationIdProvider
{
[NotNull]
string Get();
string? Get();
IDisposable Change(string? correlationId);
}

2
framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs

@ -6,6 +6,8 @@ public interface IDaprSerializer
{
byte[] Serialize(object obj);
string SerializeToString(object obj);
object Deserialize(byte[] value, Type type);
object Deserialize(string value, Type type);

5
framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs

@ -19,6 +19,11 @@ public class Utf8JsonDaprSerializer : IDaprSerializer, ITransientDependency
return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj));
}
public string SerializeToString(object obj)
{
return _jsonSerializer.Serialize(obj);
}
public object Deserialize(byte[] value, Type type)
{
return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value));

9
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IncomingEventRecord.cs

@ -48,13 +48,20 @@ public class IncomingEventRecord :
public IncomingEventInfo ToIncomingEventInfo()
{
return new IncomingEventInfo(
var info = new IncomingEventInfo(
Id,
MessageId,
EventName,
EventData,
CreationTime
);
foreach (var property in ExtraProperties)
{
info.SetProperty(property.Key, property.Value);
}
return info;
}
public void MarkAsProcessed(DateTime processedTime)

9
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs

@ -41,11 +41,18 @@ public class OutgoingEventRecord :
public OutgoingEventInfo ToOutgoingEventInfo()
{
return new OutgoingEventInfo(
var info = new OutgoingEventInfo(
Id,
EventName,
EventData,
CreationTime
);
foreach (var property in ExtraProperties)
{
info.SetProperty(property.Key, property.Value);
}
return info;
}
}

11
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IncomingEventInfo.cs

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed;
@ -40,4 +41,14 @@ public class IncomingEventInfo : IHasExtraProperties
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
public void SetCorrelationId(string correlationId)
{
ExtraProperties[EventBusConsts.CorrelationIdHeaderName] = correlationId;
}
public string GetCorrelationId()
{
return ExtraProperties.GetOrDefault(EventBusConsts.CorrelationIdHeaderName)?.ToString();
}
}

13
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed;
@ -36,4 +37,14 @@ public class OutgoingEventInfo : IHasExtraProperties
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
}
public void SetCorrelationId(string correlationId)
{
ExtraProperties[EventBusConsts.CorrelationIdHeaderName] = correlationId;
}
public string GetCorrelationId()
{
return ExtraProperties.GetOrDefault(EventBusConsts.CorrelationIdHeaderName)?.ToString();
}
}

6
framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventBusConsts.cs

@ -0,0 +1,6 @@
namespace Volo.Abp.EventBus;
public static class EventBusConsts
{
public const string CorrelationIdHeaderName = "X-Correlation-Id";
}

55
framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs

@ -4,6 +4,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
@ -14,6 +15,7 @@ using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
using Volo.Abp.Timing;
using Volo.Abp.Tracing;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Azure;
@ -42,7 +44,8 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
IAzureServiceBusMessageConsumerFactory messageConsumerFactory,
IPublisherPool publisherPool,
IEventHandlerInvoker eventHandlerInvoker,
ILocalEventBus localEventBus)
ILocalEventBus localEventBus,
ICorrelationIdProvider correlationIdProvider)
: base(serviceScopeFactory,
currentTenant,
unitOfWorkManager,
@ -50,7 +53,8 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
guidGenerator,
clock,
eventHandlerInvoker,
localEventBus)
localEventBus,
correlationIdProvider)
{
Options = abpAzureEventBusOptions.Value;
Serializer = serializer;
@ -86,24 +90,30 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
var eventData = Serializer.Deserialize(message.Body.ToArray(), eventType);
if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData))
if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData, message.CorrelationId))
{
return;
}
await TriggerHandlersDirectAsync(eventType, eventData);
using (CorrelationIdProvider.Change(message.CorrelationId))
{
await TriggerHandlersDirectAsync(eventType, eventData);
}
}
public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.Id);
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.GetCorrelationId(), outgoingEvent.Id);
}
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
@ -125,18 +135,23 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
message.MessageId = outgoingEvent.Id.ToString();
}
message.CorrelationId = outgoingEvent.GetCorrelationId();
if (!messageBatch.TryAddMessage(message))
{
throw new AbpException(
"The message is too large to fit in the batch. Set AbpEventBusBoxesOptions.OutboxWaitingEventMaxCount to reduce the number");
}
await TriggerDistributedEventSentAsync(new DistributedEventSent()
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
}
await publisher.SendMessagesAsync(messageBatch);
@ -152,7 +167,10 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId()))
{
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
}
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
@ -244,12 +262,13 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
var body = Serializer.Serialize(eventData);
return PublishAsync(eventName, body, null);
return PublishAsync(eventName, body, CorrelationIdProvider.Get(), null);
}
protected virtual async Task PublishAsync(
string eventName,
byte[] body,
[CanBeNull] string correlationId,
Guid? eventId)
{
var message = new ServiceBusMessage(body)
@ -262,6 +281,8 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
message.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
}
message.CorrelationId = correlationId;
var publisher = await PublisherPool.GetAsync(
Options.TopicName,
Options.ConnectionName);

23
framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventData.cs

@ -0,0 +1,23 @@
namespace Volo.Abp.EventBus.Dapr;
public class AbpDaprEventData
{
public string PubSubName { get; set; }
public string Topic { get; set; }
public string MessageId { get; set; }
public string JsonData { get; set; }
public string CorrelationId { get; set; }
public AbpDaprEventData(string pubSubName, string topic, string messageId, string jsonData, string correlationId)
{
PubSubName = pubSubName;
Topic = topic;
MessageId = messageId;
JsonData = jsonData;
CorrelationId = correlationId;
}
}

75
framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs

@ -13,6 +13,7 @@ using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
using Volo.Abp.Timing;
using Volo.Abp.Tracing;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Dapr;
@ -39,8 +40,17 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
IDaprSerializer serializer,
IOptions<AbpDaprEventBusOptions> daprEventBusOptions,
IAbpDaprClientFactory daprClientFactory,
ILocalEventBus localEventBus)
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus)
ILocalEventBus localEventBus,
ICorrelationIdProvider correlationIdProvider)
: base(serviceScopeFactory,
currentTenant,
unitOfWorkManager,
abpDistributedEventBusOptions,
guidGenerator,
clock,
eventHandlerInvoker,
localEventBus,
correlationIdProvider)
{
Serializer = serializer;
DaprEventBusOptions = daprEventBusOptions.Value;
@ -119,9 +129,9 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishToDaprAsync(eventType, eventData);
await PublishToDaprAsync(eventType, eventData, null, CorrelationIdProvider.Get());
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
@ -141,43 +151,52 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
return handlerFactoryList.ToArray();
}
public override async Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)));
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId());
}
public override async Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
var outgoingEventArray = outgoingEvents.ToArray();
foreach (var outgoingEvent in outgoingEventArray)
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)));
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId());
}
}
public virtual async Task TriggerHandlersAsync(string messageId, Type eventType, object eventData)
public virtual async Task TriggerHandlersAsync(Type eventType, object eventData, string messageId = null, string correlationId = null)
{
if (await AddToInboxAsync(messageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData))
if (await AddToInboxAsync(messageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, correlationId))
{
return;
}
await TriggerHandlersDirectAsync(eventType, eventData);
using (CorrelationIdProvider.Change(correlationId))
{
await TriggerHandlersDirectAsync(eventType, eventData);
}
}
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
@ -190,7 +209,10 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId()))
{
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
}
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
@ -226,15 +248,16 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
return EventTypes.GetOrDefault(eventName);
}
protected virtual async Task PublishToDaprAsync(Type eventType, object eventData)
protected virtual async Task PublishToDaprAsync(Type eventType, object eventData, Guid? messageId = null, string correlationId = null)
{
await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData);
await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData, messageId, correlationId);
}
protected virtual async Task PublishToDaprAsync(string eventName, object eventData)
protected virtual async Task PublishToDaprAsync(string eventName, object eventData, Guid? messageId = null, string correlationId = null)
{
var client = DaprClientFactory.Create();
await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: eventData);
var data = new AbpDaprEventData(DaprEventBusOptions.PubSubName, eventName, (messageId ?? GuidGenerator.Create()).ToString("N"), Serializer.SerializeToString(eventData), correlationId);
await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: data);
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)

82
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs

@ -14,6 +14,7 @@ using Volo.Abp.Kafka;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
using Volo.Abp.Timing;
using Volo.Abp.Tracing;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Kafka;
@ -42,7 +43,8 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
IGuidGenerator guidGenerator,
IClock clock,
IEventHandlerInvoker eventHandlerInvoker,
ILocalEventBus localEventBus)
ILocalEventBus localEventBus,
ICorrelationIdProvider correlationIdProvider)
: base(
serviceScopeFactory,
currentTenant,
@ -51,7 +53,8 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
guidGenerator,
clock,
eventHandlerInvoker,
localEventBus)
localEventBus,
correlationIdProvider)
{
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
MessageConsumerFactory = messageConsumerFactory;
@ -84,13 +87,17 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
var messageId = message.GetMessageId();
var eventData = Serializer.Deserialize(message.Value, eventType);
var correlationId = message.GetCorrelationId();
if (await AddToInboxAsync(messageId, eventName, eventType, eventData))
if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId))
{
return;
}
await TriggerHandlersDirectAsync(eventType, eventData);
using (CorrelationIdProvider.Change(correlationId))
{
await TriggerHandlersDirectAsync(eventType, eventData);
}
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
@ -163,14 +170,21 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
var headers = new Headers
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) }
};
if (CorrelationIdProvider.Get() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(CorrelationIdProvider.Get()!));
}
await PublishAsync(
AbpKafkaEventBusOptions.TopicName,
eventType,
eventData,
new Headers
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) }
}
headers
);
}
@ -179,25 +193,34 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
public override async Task PublishFromOutboxAsync(
public async override Task PublishFromOutboxAsync(
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
var headers = new Headers
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) }
};
if (outgoingEvent.GetCorrelationId() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!));
}
await PublishAsync(
AbpKafkaEventBusOptions.TopicName,
outgoingEvent.EventName,
outgoingEvent.EventData,
new Headers
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) }
}
headers
);
}
@ -214,12 +237,20 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{ "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)}
};
await TriggerDistributedEventSentAsync(new DistributedEventSent()
if (outgoingEvent.GetCorrelationId() != null)
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!));
}
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
producer.Produce(
AbpKafkaEventBusOptions.TopicName,
@ -244,7 +275,10 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId()))
{
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
}
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);

12
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/MessageExtensions.cs

@ -15,4 +15,16 @@ public static class MessageExtensions
return messageId;
}
public static string GetCorrelationId<TKey, TValue>(this Message<TKey, TValue> message)
{
string correlationId = null;
if (message.Headers.TryGetLastBytes(EventBusConsts.CorrelationIdHeaderName, out var correlationIdBytes))
{
correlationId = System.Text.Encoding.UTF8.GetString(correlationIdBytes);
}
return correlationId;
}
}

88
framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs

@ -3,6 +3,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
@ -15,6 +16,7 @@ using Volo.Abp.MultiTenancy;
using Volo.Abp.RabbitMQ;
using Volo.Abp.Threading;
using Volo.Abp.Timing;
using Volo.Abp.Tracing;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.RabbitMq;
@ -49,7 +51,8 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
IGuidGenerator guidGenerator,
IClock clock,
IEventHandlerInvoker eventHandlerInvoker,
ILocalEventBus localEventBus)
ILocalEventBus localEventBus,
ICorrelationIdProvider correlationIdProvider)
: base(
serviceScopeFactory,
currentTenant,
@ -58,7 +61,8 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
guidGenerator,
clock,
eventHandlerInvoker,
localEventBus)
localEventBus,
correlationIdProvider)
{
ConnectionPool = connectionPool;
Serializer = serializer;
@ -103,12 +107,16 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
var eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType);
if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData))
var correlationId = ea.BasicProperties.CorrelationId;
if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData, correlationId))
{
return;
}
await TriggerHandlersDirectAsync(eventType, eventData);
using (CorrelationIdProvider.Change(correlationId))
{
await TriggerHandlersDirectAsync(eventType, eventData);
}
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
@ -186,7 +194,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishAsync(eventType, eventData, null);
await PublishAsync(eventType, eventData, correlationId: CorrelationIdProvider.Get());
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
@ -194,18 +202,21 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
public override async Task PublishFromOutboxAsync(
public async override Task PublishFromOutboxAsync(
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, null, eventId: outgoingEvent.Id);
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId());
}
public async override Task PublishManyFromOutboxAsync(
@ -219,19 +230,22 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
foreach (var outgoingEvent in outgoingEventArray)
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
await PublishAsync(
channel,
outgoingEvent.EventName,
outgoingEvent.EventData,
properties: null,
eventId: outgoingEvent.Id);
eventId: outgoingEvent.Id,
correlationId: outgoingEvent.GetCorrelationId());
}
channel.WaitForConfirmsOrDie();
@ -250,7 +264,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId()))
{
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
}
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
@ -262,28 +279,29 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
return Serializer.Serialize(eventData);
}
public Task PublishAsync(
public virtual Task PublishAsync(
Type eventType,
object eventData,
IBasicProperties properties,
Dictionary<string, object> headersArguments = null)
Dictionary<string, object> headersArguments = null,
Guid? eventId = null,
[CanBeNull] string correlationId = null)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);
return PublishAsync(eventName, body, properties, headersArguments);
return PublishAsync( eventName, body, headersArguments, eventId, correlationId);
}
protected virtual Task PublishAsync(
string eventName,
byte[] body,
IBasicProperties properties,
Dictionary<string, object> headersArguments = null,
Guid? eventId = null)
Guid? eventId = null,
[CanBeNull] string correlationId = null)
{
using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
{
return PublishAsync(channel, eventName, body, properties, headersArguments, eventId);
return PublishAsync(channel, eventName, body, headersArguments, eventId, correlationId);
}
}
@ -291,23 +309,25 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
IModel channel,
string eventName,
byte[] body,
IBasicProperties properties,
Dictionary<string, object> headersArguments = null,
Guid? eventId = null)
Guid? eventId = null,
[CanBeNull] string correlationId = null)
{
EnsureExchangeExists(channel);
if (properties == null)
{
properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
}
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
if (properties.MessageId.IsNullOrEmpty())
{
properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
}
if (correlationId != null)
{
properties.CorrelationId = correlationId;
}
SetEventMessageHeaders(properties, headersArguments);
channel.BasicPublish(

62
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

@ -16,6 +16,7 @@ using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
using Volo.Abp.Timing;
using Volo.Abp.Tracing;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Rebus;
@ -43,7 +44,8 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
IGuidGenerator guidGenerator,
IClock clock,
IEventHandlerInvoker eventHandlerInvoker,
ILocalEventBus localEventBus) :
ILocalEventBus localEventBus,
ICorrelationIdProvider correlationIdProvider) :
base(
serviceScopeFactory,
currentTenant,
@ -52,7 +54,8 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
guidGenerator,
clock,
eventHandlerInvoker,
localEventBus)
localEventBus,
correlationIdProvider)
{
Rebus = rebus;
Serializer = serializer;
@ -144,18 +147,27 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
var messageId = MessageContext.Current.TransportMessage.GetMessageId();
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var correlationId = MessageContext.Current.Headers.GetOrDefault(EventBusConsts.CorrelationIdHeaderName);
if (await AddToInboxAsync(messageId, eventName, eventType, eventData))
if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId))
{
return;
}
await TriggerHandlersDirectAsync(eventType, eventData);
using (CorrelationIdProvider.Change(correlationId))
{
await TriggerHandlersDirectAsync(eventType, eventData);
}
}
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishAsync(eventType, eventData);
var headers = new Dictionary<string, string>();
if (CorrelationIdProvider.Get() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, CorrelationIdProvider.Get());
}
await PublishAsync(eventType, eventData, headersArguments: headers);
}
protected virtual async Task PublishAsync(
@ -234,21 +246,29 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return false;
}
public override async Task PublishFromOutboxAsync(
public async override Task PublishFromOutboxAsync(
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
await TriggerDistributedEventSentAsync(new DistributedEventSent()
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
await TriggerDistributedEventSentAsync(new DistributedEventSent() {
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id);
var headers = new Dictionary<string, string>();
if (outgoingEvent.GetCorrelationId() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId());
}
await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id, headersArguments: headers);
}
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
@ -259,12 +279,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
{
foreach (var outgoingEvent in outgoingEventArray)
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
await PublishFromOutboxAsync(outgoingEvent, outboxConfig);
}
@ -285,7 +308,10 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId()))
{
await TriggerHandlersFromInboxAsync(eventType, eventData, exceptions, inboxConfig);
}
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);

55
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs

@ -2,12 +2,14 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Local;
using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Timing;
using Volo.Abp.Tracing;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Distributed;
@ -18,6 +20,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
protected IClock Clock { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
protected ILocalEventBus LocalEventBus { get; }
protected ICorrelationIdProvider CorrelationIdProvider { get; }
protected DistributedEventBusBase(
IServiceScopeFactory serviceScopeFactory,
@ -27,7 +30,8 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
IGuidGenerator guidGenerator,
IClock clock,
IEventHandlerInvoker eventHandlerInvoker,
ILocalEventBus localEventBus) : base(
ILocalEventBus localEventBus,
ICorrelationIdProvider correlationIdProvider) : base(
serviceScopeFactory,
currentTenant,
unitOfWorkManager,
@ -37,6 +41,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
Clock = clock;
AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;
LocalEventBus = localEventBus;
CorrelationIdProvider = correlationIdProvider;
}
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
@ -129,14 +134,14 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
EventData = eventData
});
await eventOutbox.EnqueueAsync(
new OutgoingEventInfo(
GuidGenerator.Create(),
eventName,
Serialize(eventData),
Clock.Now
)
var outgoingEventInfo = new OutgoingEventInfo(
GuidGenerator.Create(),
eventName,
Serialize(eventData),
Clock.Now
);
outgoingEventInfo.SetCorrelationId(CorrelationIdProvider.Get());
await eventOutbox.EnqueueAsync(outgoingEventInfo);
return true;
}
}
@ -153,7 +158,8 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
string messageId,
string eventName,
Type eventType,
object eventData)
object eventData,
[CanBeNull] string correlationId)
{
if (AbpDistributedEventBusOptions.Inboxes.Count <= 0)
{
@ -177,22 +183,25 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
}
}
await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
using (CorrelationIdProvider.Change(correlationId))
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
});
await eventInbox.EnqueueAsync(
new IncomingEventInfo(
GuidGenerator.Create(),
messageId,
eventName,
Serialize(eventData),
Clock.Now
)
await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
});
}
var incomingEventInfo = new IncomingEventInfo(
GuidGenerator.Create(),
messageId,
eventName,
Serialize(eventData),
Clock.Now
);
incomingEventInfo.SetCorrelationId(correlationId);
await eventInbox.EnqueueAsync(incomingEventInfo);
}
}
}

9
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IncomingEventRecord.cs

@ -48,13 +48,20 @@ public class IncomingEventRecord :
public IncomingEventInfo ToIncomingEventInfo()
{
return new IncomingEventInfo(
var info = new IncomingEventInfo(
Id,
MessageId,
EventName,
EventData,
CreationTime
);
foreach (var property in ExtraProperties)
{
info.SetProperty(property.Key, property.Value);
}
return info;
}
public void MarkAsProcessed(DateTime processedTime)

9
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/OutgoingEventRecord.cs

@ -41,11 +41,18 @@ public class OutgoingEventRecord :
public OutgoingEventInfo ToOutgoingEventInfo()
{
return new OutgoingEventInfo(
var info = new OutgoingEventInfo(
Id,
EventName,
EventData,
CreationTime
);
foreach (var property in ExtraProperties)
{
info.SetProperty(property.Key, property.Value);
}
return info;
}
}

17
framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProviderController.cs

@ -0,0 +1,17 @@
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.AspNetCore.Mvc;
using Volo.Abp.Tracing;
namespace Volo.Abp.AspNetCore.CorrelationIdProvider;
[Route("api/correlation")]
public class CorrelationIdProviderController : AbpController
{
[HttpGet]
[Route("get")]
public string Get()
{
return this.HttpContext.RequestServices.GetRequiredService<ICorrelationIdProvider>().Get();
}
}

48
framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/CorrelationIdProvider/CorrelationIdProvider_Tests.cs

@ -0,0 +1,48 @@
using System;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Shouldly;
using Volo.Abp.AspNetCore.Mvc;
using Xunit;
namespace Volo.Abp.AspNetCore.CorrelationIdProvider;
public class CorrelationIdProvider_Tests : AspNetCoreMvcTestBase
{
[Fact]
public async Task Test()
{
// Test AbpCorrelationIdMiddleware without X-Correlation-Id header
using (var requestMessage = new HttpRequestMessage(HttpMethod.Get, "/api/correlation/404"))
{
var response = await Client.SendAsync(requestMessage);
response.StatusCode.ShouldBe(HttpStatusCode.NotFound);
response.Headers.ShouldContain(x => x.Key == "X-Correlation-Id" && x.Value.First() != null);
}
var correlationId = Guid.NewGuid().ToString("N");
// Test AbpCorrelationIdMiddleware
using (var requestMessage = new HttpRequestMessage(HttpMethod.Get, "/api/correlation/404"))
{
requestMessage.Headers.Add("X-Correlation-Id", correlationId);
var response = await Client.SendAsync(requestMessage);
response.StatusCode.ShouldBe(HttpStatusCode.NotFound);
response.Headers.ShouldContain(x => x.Key == "X-Correlation-Id" && x.Value.First() == correlationId);
}
// Test AspNetCoreCorrelationIdProvider
using (var requestMessage = new HttpRequestMessage(HttpMethod.Get, "/api/correlation/get"))
{
requestMessage.Headers.Add("X-Correlation-Id", correlationId);
var response = await Client.SendAsync(requestMessage);
response.StatusCode.ShouldBe(HttpStatusCode.OK);
(await response.Content.ReadAsStringAsync()).ShouldBe(correlationId);
}
}
}

33
framework/test/Volo.Abp.Core.Tests/Volo/Abp/CorrelationIdProvider/CorrelationIdProvider_Tests.cs

@ -0,0 +1,33 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Shouldly;
using Volo.Abp.Modularity;
using Volo.Abp.Tracing;
using Xunit;
namespace Volo.Abp.CorrelationIdProvider;
public class CorrelationIdProvider_Tests
{
[Fact]
public async Task Test()
{
using (var application = await AbpApplicationFactory.CreateAsync<IndependentEmptyModule>())
{
await application.InitializeAsync();
var correlationIdProvider = application.ServiceProvider.GetRequiredService<ICorrelationIdProvider>();
correlationIdProvider.Get().ShouldBeNull();
var correlationId = Guid.NewGuid().ToString("N");
using (correlationIdProvider.Change(correlationId))
{
correlationIdProvider.Get().ShouldBe(correlationId);
}
correlationIdProvider.Get().ShouldBeNull();
}
}
}
Loading…
Cancel
Save