|
|
|
@ -168,7 +168,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen |
|
|
|
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); |
|
|
|
} |
|
|
|
|
|
|
|
protected async override Task PublishToEventBusAsync(Type eventType, object eventData) |
|
|
|
protected override async Task PublishToEventBusAsync(Type eventType, object eventData) |
|
|
|
{ |
|
|
|
var headers = new Headers |
|
|
|
{ |
|
|
|
@ -193,7 +193,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen |
|
|
|
unitOfWork.AddOrReplaceDistributedEvent(eventRecord); |
|
|
|
} |
|
|
|
|
|
|
|
public async override Task PublishFromOutboxAsync( |
|
|
|
public override async Task PublishFromOutboxAsync( |
|
|
|
OutgoingEventInfo outgoingEvent, |
|
|
|
OutboxConfig outboxConfig) |
|
|
|
{ |
|
|
|
@ -206,13 +206,18 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen |
|
|
|
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); |
|
|
|
} |
|
|
|
|
|
|
|
await PublishAsync( |
|
|
|
var result = await PublishAsync( |
|
|
|
AbpKafkaEventBusOptions.TopicName, |
|
|
|
outgoingEvent.EventName, |
|
|
|
outgoingEvent.EventData, |
|
|
|
headers |
|
|
|
); |
|
|
|
|
|
|
|
if (result.Status != PersistenceStatus.Persisted) |
|
|
|
{ |
|
|
|
throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'. Status: {result.Status}"); |
|
|
|
} |
|
|
|
|
|
|
|
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) |
|
|
|
{ |
|
|
|
await TriggerDistributedEventSentAsync(new DistributedEventSent() |
|
|
|
@ -224,7 +229,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig) |
|
|
|
public override async Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig) |
|
|
|
{ |
|
|
|
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); |
|
|
|
var outgoingEventArray = outgoingEvents.ToArray(); |
|
|
|
@ -242,7 +247,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen |
|
|
|
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); |
|
|
|
} |
|
|
|
|
|
|
|
producer.Produce( |
|
|
|
var result = await producer.ProduceAsync( |
|
|
|
AbpKafkaEventBusOptions.TopicName, |
|
|
|
new Message<string, byte[]> |
|
|
|
{ |
|
|
|
@ -251,6 +256,11 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen |
|
|
|
Headers = headers |
|
|
|
}); |
|
|
|
|
|
|
|
if (result.Status != PersistenceStatus.Persisted) |
|
|
|
{ |
|
|
|
throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'. Status: {result.Status}"); |
|
|
|
} |
|
|
|
|
|
|
|
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) |
|
|
|
{ |
|
|
|
await TriggerDistributedEventSentAsync(new DistributedEventSent() |
|
|
|
@ -263,7 +273,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public async override Task ProcessFromInboxAsync( |
|
|
|
public override async Task ProcessFromInboxAsync( |
|
|
|
IncomingEventInfo incomingEvent, |
|
|
|
InboxConfig inboxConfig) |
|
|
|
{ |
|
|
|
@ -290,12 +300,16 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen |
|
|
|
return Serializer.Serialize(eventData); |
|
|
|
} |
|
|
|
|
|
|
|
private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers) |
|
|
|
private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers) |
|
|
|
{ |
|
|
|
var eventName = EventNameAttribute.GetNameOrDefault(eventType); |
|
|
|
var body = Serializer.Serialize(eventData); |
|
|
|
|
|
|
|
return PublishAsync(topicName, eventName, body, headers); |
|
|
|
var result = await PublishAsync(topicName, eventName, body, headers); |
|
|
|
if (result.Status != PersistenceStatus.Persisted) |
|
|
|
{ |
|
|
|
throw new AbpException($"Failed to publish event '{eventName}' to topic '{topicName}'. Status: {result.Status}"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private Task<DeliveryResult<string, byte[]>> PublishAsync( |
|
|
|
|