diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 4dba728c51..aa79c81f50 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -162,13 +162,13 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { await PublishAsync( + AbpKafkaEventBusOptions.TopicName, eventType, eventData, new Headers { - { "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) } - }, - null + { "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) } + } ); } @@ -188,42 +188,31 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen new Headers { { "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) } - }, - null + } ); } public override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) { - var producer = ProducerPool.Get(); + var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); var outgoingEventArray = outgoingEvents.ToArray(); - producer.BeginTransaction(); - try + + foreach (var outgoingEvent in outgoingEventArray) { - foreach (var outgoingEvent in outgoingEventArray) + var messageId = outgoingEvent.Id.ToString("N"); + var headers = new Headers { - var messageId = outgoingEvent.Id.ToString("N"); - var headers = new Headers - { - { "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)} - }; - - producer.Produce( - AbpKafkaEventBusOptions.TopicName, - new Message - { - Key = outgoingEvent.EventName, - Value = outgoingEvent.EventData, - Headers = headers - }); - } + { "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)} + }; - producer.CommitTransaction(); - } - catch (Exception e) - { - producer.AbortTransaction(); - throw; + producer.Produce( + AbpKafkaEventBusOptions.TopicName, + new Message + { + Key = outgoingEvent.EventName, + Value = outgoingEvent.EventData, + Headers = headers + }); } return Task.CompletedTask; @@ -253,47 +242,22 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen return Serializer.Serialize(eventData); } - public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers, Dictionary headersArguments) - { - await PublishAsync( - AbpKafkaEventBusOptions.TopicName, - eventType, - eventData, - headers, - headersArguments - ); - } - - private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers, Dictionary headersArguments) + private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); - return PublishAsync(topicName, eventName, body, headers, headersArguments); + return PublishAsync(topicName, eventName, body, headers); } private Task> PublishAsync( string topicName, string eventName, byte[] body, - Headers headers, - Dictionary headersArguments) + Headers headers) { var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); - return PublishAsync(producer, topicName, eventName, body, headers, headersArguments); - } - - private Task> PublishAsync( - IProducer producer, - string topicName, - string eventName, - byte[] body, - Headers headers, - Dictionary headersArguments) - { - SetEventMessageHeaders(headers, headersArguments); - return producer.ProduceAsync( topicName, new Message @@ -304,20 +268,6 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen }); } - private void SetEventMessageHeaders(Headers headers, Dictionary headersArguments) - { - if (headersArguments == null) - { - return; - } - - foreach (var header in headersArguments) - { - headers.Remove(header.Key); - headers.Add(header.Key, Serializer.Serialize(header.Value)); - } - } - private List GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd( diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs index 0a31a3483a..691a29c1d2 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs @@ -41,16 +41,8 @@ public class ProducerPool : IProducerPool, ISingletonDependency { var producerConfig = new ProducerConfig(Options.Connections.GetOrDefault(connection)); Options.ConfigureProducer?.Invoke(producerConfig); - - if (producerConfig.TransactionalId.IsNullOrWhiteSpace()) - { - producerConfig.TransactionalId = Guid.NewGuid().ToString(); - } - - var producer = new ProducerBuilder(producerConfig).Build(); - producer.InitTransactions(DefaultTransactionsWaitDuration); + return new ProducerBuilder(producerConfig).Build(); - return producer; })).Value; }