Browse Source

Merge pull request #13158 from abpframework/liangshiwei/kafka-patch

Fix Kafka problem
pull/13161/head
maliming 4 years ago
committed by GitHub
parent
commit
1da21efeb7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 94
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
  2. 10
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs

94
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs

@ -162,13 +162,13 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected async override Task PublishToEventBusAsync(Type eventType, object eventData) protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{ {
await PublishAsync( await PublishAsync(
AbpKafkaEventBusOptions.TopicName,
eventType, eventType,
eventData, eventData,
new Headers new Headers
{ {
{ "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) } { "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) }
}, }
null
); );
} }
@ -188,42 +188,31 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
new Headers new Headers
{ {
{ "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) } { "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) }
}, }
null
); );
} }
public override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig) public override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{ {
var producer = ProducerPool.Get(); var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
var outgoingEventArray = outgoingEvents.ToArray(); var outgoingEventArray = outgoingEvents.ToArray();
producer.BeginTransaction();
try foreach (var outgoingEvent in outgoingEventArray)
{ {
foreach (var outgoingEvent in outgoingEventArray) var messageId = outgoingEvent.Id.ToString("N");
var headers = new Headers
{ {
var messageId = outgoingEvent.Id.ToString("N"); { "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)}
var headers = new Headers };
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)}
};
producer.Produce(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Key = outgoingEvent.EventName,
Value = outgoingEvent.EventData,
Headers = headers
});
}
producer.CommitTransaction(); producer.Produce(
} AbpKafkaEventBusOptions.TopicName,
catch (Exception e) new Message<string, byte[]>
{ {
producer.AbortTransaction(); Key = outgoingEvent.EventName,
throw; Value = outgoingEvent.EventData,
Headers = headers
});
} }
return Task.CompletedTask; return Task.CompletedTask;
@ -253,47 +242,22 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return Serializer.Serialize(eventData); return Serializer.Serialize(eventData);
} }
public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments) private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers)
{
await PublishAsync(
AbpKafkaEventBusOptions.TopicName,
eventType,
eventData,
headers,
headersArguments
);
}
private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)
{ {
var eventName = EventNameAttribute.GetNameOrDefault(eventType); var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData); var body = Serializer.Serialize(eventData);
return PublishAsync(topicName, eventName, body, headers, headersArguments); return PublishAsync(topicName, eventName, body, headers);
} }
private Task<DeliveryResult<string, byte[]>> PublishAsync( private Task<DeliveryResult<string, byte[]>> PublishAsync(
string topicName, string topicName,
string eventName, string eventName,
byte[] body, byte[] body,
Headers headers, Headers headers)
Dictionary<string, object> headersArguments)
{ {
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
return PublishAsync(producer, topicName, eventName, body, headers, headersArguments);
}
private Task<DeliveryResult<string, byte[]>> PublishAsync(
IProducer<string, byte[]> producer,
string topicName,
string eventName,
byte[] body,
Headers headers,
Dictionary<string, object> headersArguments)
{
SetEventMessageHeaders(headers, headersArguments);
return producer.ProduceAsync( return producer.ProduceAsync(
topicName, topicName,
new Message<string, byte[]> new Message<string, byte[]>
@ -304,20 +268,6 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
}); });
} }
private void SetEventMessageHeaders(Headers headers, Dictionary<string, object> headersArguments)
{
if (headersArguments == null)
{
return;
}
foreach (var header in headersArguments)
{
headers.Remove(header.Key);
headers.Add(header.Key, Serializer.Serialize(header.Value));
}
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType) private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{ {
return HandlerFactories.GetOrAdd( return HandlerFactories.GetOrAdd(

10
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs

@ -41,16 +41,8 @@ public class ProducerPool : IProducerPool, ISingletonDependency
{ {
var producerConfig = new ProducerConfig(Options.Connections.GetOrDefault(connection)); var producerConfig = new ProducerConfig(Options.Connections.GetOrDefault(connection));
Options.ConfigureProducer?.Invoke(producerConfig); Options.ConfigureProducer?.Invoke(producerConfig);
return new ProducerBuilder<string, byte[]>(producerConfig).Build();
if (producerConfig.TransactionalId.IsNullOrWhiteSpace())
{
producerConfig.TransactionalId = Guid.NewGuid().ToString();
}
var producer = new ProducerBuilder<string, byte[]>(producerConfig).Build();
producer.InitTransactions(DefaultTransactionsWaitDuration);
return producer;
})).Value; })).Value;
} }

Loading…
Cancel
Save