Browse Source

Merge pull request #12935 from abpframework/liangshiwei/rebus

Add messageId to Rebus event bus
pull/12945/head
maliming 4 years ago
committed by GitHub
parent
commit
d99687a43c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

17
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Rebus.Bus; using Rebus.Bus;
using Rebus.Messages;
using Rebus.Pipeline; using Rebus.Pipeline;
using Rebus.Transport; using Rebus.Transport;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
@ -154,7 +155,11 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
await PublishAsync(eventType, eventData); await PublishAsync(eventType, eventData);
} }
protected virtual async Task PublishAsync(Type eventType, object eventData) protected virtual async Task PublishAsync(
Type eventType,
object eventData,
Guid? eventId = null,
Dictionary<string, string> headersArguments = null)
{ {
if (AbpRebusEventBusOptions.Publish != null) if (AbpRebusEventBusOptions.Publish != null)
{ {
@ -162,7 +167,13 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return; return;
} }
await Rebus.Publish(eventData); headersArguments ??= new Dictionary<string, string>();
if (!headersArguments.ContainsKey(Headers.MessageId))
{
headersArguments[Headers.MessageId] = (eventId ?? GuidGenerator.Create()).ToString("N");
}
await Rebus.Publish(eventData, headersArguments);
} }
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
@ -221,7 +232,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
return PublishToEventBusAsync(eventType, eventData); return PublishAsync(eventType, eventData, eventId: outgoingEvent.Id);
} }
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig) public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)

Loading…
Cancel
Save