From ab549a901167b63b71f930445dcb550d012c636c Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Wed, 8 Jun 2022 17:39:35 +0800 Subject: [PATCH] Add messageId to Rebus event bus --- .../EventBus/Rebus/RebusDistributedEventBus.cs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 4ccd72ba15..bfe763d960 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Rebus.Bus; +using Rebus.Messages; using Rebus.Pipeline; using Rebus.Transport; using Volo.Abp.DependencyInjection; @@ -154,7 +155,11 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen await PublishAsync(eventType, eventData); } - protected virtual async Task PublishAsync(Type eventType, object eventData) + protected virtual async Task PublishAsync( + Type eventType, + object eventData, + Guid? eventId = null, + Dictionary headersArguments = null) { if (AbpRebusEventBusOptions.Publish != null) { @@ -162,7 +167,13 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen return; } - await Rebus.Publish(eventData); + headersArguments ??= new Dictionary(); + if (!headersArguments.ContainsKey(Headers.MessageId)) + { + headersArguments[Headers.MessageId] = (eventId ?? GuidGenerator.Create()).ToString("N"); + } + + await Rebus.Publish(eventData, headersArguments); } protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) @@ -221,7 +232,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); - return PublishToEventBusAsync(eventType, eventData); + return PublishAsync(eventType, eventData, eventId: outgoingEvent.Id); } public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig)