Browse Source

Fix Kafka problem

pull/13158/head
liangshiwei 4 years ago
parent
commit
4ab0f7fe8c
  1. 94
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
  2. 10
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs

94
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs

@ -162,13 +162,13 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishAsync(
AbpKafkaEventBusOptions.TopicName,
eventType,
eventData,
new Headers
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) }
},
null
{ "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) }
}
);
}
@ -188,42 +188,31 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
new Headers
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) }
},
null
}
);
}
public override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
var producer = ProducerPool.Get();
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
var outgoingEventArray = outgoingEvents.ToArray();
producer.BeginTransaction();
try
foreach (var outgoingEvent in outgoingEventArray)
{
foreach (var outgoingEvent in outgoingEventArray)
var messageId = outgoingEvent.Id.ToString("N");
var headers = new Headers
{
var messageId = outgoingEvent.Id.ToString("N");
var headers = new Headers
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)}
};
producer.Produce(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Key = outgoingEvent.EventName,
Value = outgoingEvent.EventData,
Headers = headers
});
}
{ "messageId", System.Text.Encoding.UTF8.GetBytes(messageId)}
};
producer.CommitTransaction();
}
catch (Exception e)
{
producer.AbortTransaction();
throw;
producer.Produce(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Key = outgoingEvent.EventName,
Value = outgoingEvent.EventData,
Headers = headers
});
}
return Task.CompletedTask;
@ -253,47 +242,22 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return Serializer.Serialize(eventData);
}
public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)
{
await PublishAsync(
AbpKafkaEventBusOptions.TopicName,
eventType,
eventData,
headers,
headersArguments
);
}
private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)
private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);
return PublishAsync(topicName, eventName, body, headers, headersArguments);
return PublishAsync(topicName, eventName, body, headers);
}
private Task<DeliveryResult<string, byte[]>> PublishAsync(
string topicName,
string eventName,
byte[] body,
Headers headers,
Dictionary<string, object> headersArguments)
Headers headers)
{
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
return PublishAsync(producer, topicName, eventName, body, headers, headersArguments);
}
private Task<DeliveryResult<string, byte[]>> PublishAsync(
IProducer<string, byte[]> producer,
string topicName,
string eventName,
byte[] body,
Headers headers,
Dictionary<string, object> headersArguments)
{
SetEventMessageHeaders(headers, headersArguments);
return producer.ProduceAsync(
topicName,
new Message<string, byte[]>
@ -304,20 +268,6 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
});
}
private void SetEventMessageHeaders(Headers headers, Dictionary<string, object> headersArguments)
{
if (headersArguments == null)
{
return;
}
foreach (var header in headersArguments)
{
headers.Remove(header.Key);
headers.Add(header.Key, Serializer.Serialize(header.Value));
}
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(

10
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs

@ -41,16 +41,8 @@ public class ProducerPool : IProducerPool, ISingletonDependency
{
var producerConfig = new ProducerConfig(Options.Connections.GetOrDefault(connection));
Options.ConfigureProducer?.Invoke(producerConfig);
if (producerConfig.TransactionalId.IsNullOrWhiteSpace())
{
producerConfig.TransactionalId = Guid.NewGuid().ToString();
}
var producer = new ProducerBuilder<string, byte[]>(producerConfig).Build();
producer.InitTransactions(DefaultTransactionsWaitDuration);
return new ProducerBuilder<string, byte[]>(producerConfig).Build();
return producer;
})).Value;
}

Loading…
Cancel
Save