From 857f055ba0f09a6838e242975c17317f1f2292b3 Mon Sep 17 00:00:00 2001 From: maliming Date: Sun, 7 Dec 2025 13:15:58 +0800 Subject: [PATCH 1/4] Improve Kafka event publishing reliability --- .../Kafka/KafkaDistributedEventBus.cs | 30 ++++++++++++++----- .../Volo/Abp/Kafka/ProducerPool.cs | 18 ++++++----- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 4bdf6a2782..bf44fb6ae8 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -168,7 +168,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - protected async override Task PublishToEventBusAsync(Type eventType, object eventData) + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { var headers = new Headers { @@ -193,7 +193,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - public async override Task PublishFromOutboxAsync( + public override async Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { @@ -206,13 +206,18 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); } - await PublishAsync( + var result = await PublishAsync( AbpKafkaEventBusOptions.TopicName, outgoingEvent.EventName, outgoingEvent.EventData, headers ); + if (result.Status != PersistenceStatus.Persisted) + { + throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'."); + } + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -224,7 +229,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen } } - public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) + public override async Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) { var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); var outgoingEventArray = outgoingEvents.ToArray(); @@ -242,7 +247,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); } - producer.Produce( + var result = await producer.ProduceAsync( AbpKafkaEventBusOptions.TopicName, new Message { @@ -251,6 +256,11 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen Headers = headers }); + if (result.Status != PersistenceStatus.Persisted) + { + throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'."); + } + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -263,7 +273,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen } } - public async override Task ProcessFromInboxAsync( + public override async Task ProcessFromInboxAsync( IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { @@ -290,12 +300,16 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen return Serializer.Serialize(eventData); } - private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers) + private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); - return PublishAsync(topicName, eventName, body, headers); + var result = await PublishAsync(topicName, eventName, body, headers); + if (result.Status != PersistenceStatus.Persisted) + { + throw new AbpException($"Failed to publish event '{eventName}' to topic '{topicName}'."); + } } private Task> PublishAsync( diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs index 23b9b71a57..a22f5970a2 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs @@ -17,7 +17,7 @@ public class ProducerPool : IProducerPool, ISingletonDependency protected ConcurrentDictionary>> Producers { get; } protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10); - + protected TimeSpan DefaultTransactionsWaitDuration { get; set; } = TimeSpan.FromSeconds(30); public ILogger Logger { get; set; } @@ -41,8 +41,10 @@ public class ProducerPool : IProducerPool, ISingletonDependency { var producerConfig = new ProducerConfig(Options.Connections.GetOrDefault(connection).ToDictionary(k => k.Key, v => v.Value)); Options.ConfigureProducer?.Invoke(producerConfig); + producerConfig.Acks ??= Acks.All; + producerConfig.EnableIdempotence ??= true; return new ProducerBuilder(producerConfig).Build(); - + })).Value; } @@ -70,7 +72,7 @@ public class ProducerPool : IProducerPool, ISingletonDependency foreach (var producer in Producers.Values) { var poolItemDisposeStopwatch = Stopwatch.StartNew(); - + try { producer.Value.Dispose(); @@ -78,19 +80,19 @@ public class ProducerPool : IProducerPool, ISingletonDependency catch { } - + poolItemDisposeStopwatch.Stop(); - + remainingWaitDuration = remainingWaitDuration > poolItemDisposeStopwatch.Elapsed ? remainingWaitDuration.Subtract(poolItemDisposeStopwatch.Elapsed) : TimeSpan.Zero; } - + poolDisposeStopwatch.Stop(); - + Logger.LogInformation( $"Disposed Kafka Producer Pool ({Producers.Count} producers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms)."); - + if (poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0) { Logger.LogWarning( From f1cc82b6b12b891d42e519c602a281c4d5b26974 Mon Sep 17 00:00:00 2001 From: Ma Liming Date: Sun, 7 Dec 2025 13:28:29 +0800 Subject: [PATCH 2/4] Update framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index bf44fb6ae8..8e9d012d7f 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -215,7 +215,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen if (result.Status != PersistenceStatus.Persisted) { - throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'."); + throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'. Status: {result.Status}"); } using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) From ed62dd63229e2a772ba9ee68cfff2b84251395bc Mon Sep 17 00:00:00 2001 From: Ma Liming Date: Sun, 7 Dec 2025 13:28:37 +0800 Subject: [PATCH 3/4] Update framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 8e9d012d7f..f9f93c755c 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -258,7 +258,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen if (result.Status != PersistenceStatus.Persisted) { - throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'."); + throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'. Status: {result.Status}"); } using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) From 947da61fbbdb3c119c02a5111c6d2ada58bc8777 Mon Sep 17 00:00:00 2001 From: Ma Liming Date: Sun, 7 Dec 2025 13:28:44 +0800 Subject: [PATCH 4/4] Update framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index f9f93c755c..a3376d9429 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -308,7 +308,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen var result = await PublishAsync(topicName, eventName, body, headers); if (result.Status != PersistenceStatus.Persisted) { - throw new AbpException($"Failed to publish event '{eventName}' to topic '{topicName}'."); + throw new AbpException($"Failed to publish event '{eventName}' to topic '{topicName}'. Status: {result.Status}"); } }