diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 4bdf6a2782..a3376d9429 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -168,7 +168,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - protected async override Task PublishToEventBusAsync(Type eventType, object eventData) + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { var headers = new Headers { @@ -193,7 +193,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - public async override Task PublishFromOutboxAsync( + public override async Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { @@ -206,13 +206,18 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); } - await PublishAsync( + var result = await PublishAsync( AbpKafkaEventBusOptions.TopicName, outgoingEvent.EventName, outgoingEvent.EventData, headers ); + if (result.Status != PersistenceStatus.Persisted) + { + throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'. Status: {result.Status}"); + } + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -224,7 +229,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen } } - public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) + public override async Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) { var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); var outgoingEventArray = outgoingEvents.ToArray(); @@ -242,7 +247,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!)); } - producer.Produce( + var result = await producer.ProduceAsync( AbpKafkaEventBusOptions.TopicName, new Message { @@ -251,6 +256,11 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen Headers = headers }); + if (result.Status != PersistenceStatus.Persisted) + { + throw new AbpException($"Failed to publish event '{outgoingEvent.EventName}' to topic '{AbpKafkaEventBusOptions.TopicName}'. Status: {result.Status}"); + } + using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { await TriggerDistributedEventSentAsync(new DistributedEventSent() @@ -263,7 +273,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen } } - public async override Task ProcessFromInboxAsync( + public override async Task ProcessFromInboxAsync( IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { @@ -290,12 +300,16 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen return Serializer.Serialize(eventData); } - private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers) + private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); - return PublishAsync(topicName, eventName, body, headers); + var result = await PublishAsync(topicName, eventName, body, headers); + if (result.Status != PersistenceStatus.Persisted) + { + throw new AbpException($"Failed to publish event '{eventName}' to topic '{topicName}'. Status: {result.Status}"); + } } private Task> PublishAsync( diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs index 23b9b71a57..a22f5970a2 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs @@ -17,7 +17,7 @@ public class ProducerPool : IProducerPool, ISingletonDependency protected ConcurrentDictionary>> Producers { get; } protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10); - + protected TimeSpan DefaultTransactionsWaitDuration { get; set; } = TimeSpan.FromSeconds(30); public ILogger Logger { get; set; } @@ -41,8 +41,10 @@ public class ProducerPool : IProducerPool, ISingletonDependency { var producerConfig = new ProducerConfig(Options.Connections.GetOrDefault(connection).ToDictionary(k => k.Key, v => v.Value)); Options.ConfigureProducer?.Invoke(producerConfig); + producerConfig.Acks ??= Acks.All; + producerConfig.EnableIdempotence ??= true; return new ProducerBuilder(producerConfig).Build(); - + })).Value; } @@ -70,7 +72,7 @@ public class ProducerPool : IProducerPool, ISingletonDependency foreach (var producer in Producers.Values) { var poolItemDisposeStopwatch = Stopwatch.StartNew(); - + try { producer.Value.Dispose(); @@ -78,19 +80,19 @@ public class ProducerPool : IProducerPool, ISingletonDependency catch { } - + poolItemDisposeStopwatch.Stop(); - + remainingWaitDuration = remainingWaitDuration > poolItemDisposeStopwatch.Elapsed ? remainingWaitDuration.Subtract(poolItemDisposeStopwatch.Elapsed) : TimeSpan.Zero; } - + poolDisposeStopwatch.Stop(); - + Logger.LogInformation( $"Disposed Kafka Producer Pool ({Producers.Count} producers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms)."); - + if (poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0) { Logger.LogWarning(