|
|
|
@ -1,6 +1,5 @@ |
|
|
|
using System; |
|
|
|
using System.Collections.Concurrent; |
|
|
|
using System.Collections.Generic; |
|
|
|
using System.Diagnostics; |
|
|
|
using System.Linq; |
|
|
|
using Confluent.Kafka; |
|
|
|
@ -15,7 +14,7 @@ namespace Volo.Abp.Kafka |
|
|
|
{ |
|
|
|
protected AbpKafkaOptions Options { get; } |
|
|
|
|
|
|
|
protected ConcurrentDictionary<string, IConsumer<string, byte[]>> Consumers { get; } |
|
|
|
protected ConcurrentDictionary<string, Lazy<IConsumer<string, byte[]>>> Consumers { get; } |
|
|
|
|
|
|
|
protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10); |
|
|
|
|
|
|
|
@ -27,7 +26,7 @@ namespace Volo.Abp.Kafka |
|
|
|
{ |
|
|
|
Options = options.Value; |
|
|
|
|
|
|
|
Consumers = new ConcurrentDictionary<string, IConsumer<string, byte[]>>(); |
|
|
|
Consumers = new ConcurrentDictionary<string, Lazy<IConsumer<string, byte[]>>>(); |
|
|
|
Logger = new NullLogger<ConsumerPool>(); |
|
|
|
} |
|
|
|
|
|
|
|
@ -36,7 +35,7 @@ namespace Volo.Abp.Kafka |
|
|
|
connectionName ??= KafkaConnections.DefaultConnectionName; |
|
|
|
|
|
|
|
return Consumers.GetOrAdd( |
|
|
|
connectionName, connection => |
|
|
|
connectionName, connection => new Lazy<IConsumer<string, byte[]>>(() => |
|
|
|
{ |
|
|
|
var config = new ConsumerConfig(Options.Connections.GetOrDefault(connection)) |
|
|
|
{ |
|
|
|
@ -45,10 +44,9 @@ namespace Volo.Abp.Kafka |
|
|
|
}; |
|
|
|
|
|
|
|
Options.ConfigureConsumer?.Invoke(config); |
|
|
|
|
|
|
|
return new ConsumerBuilder<string, byte[]>(config).Build(); |
|
|
|
} |
|
|
|
); |
|
|
|
}) |
|
|
|
).Value; |
|
|
|
} |
|
|
|
|
|
|
|
public void Dispose() |
|
|
|
@ -78,8 +76,8 @@ namespace Volo.Abp.Kafka |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
consumer.Close(); |
|
|
|
consumer.Dispose(); |
|
|
|
consumer.Value.Close(); |
|
|
|
consumer.Value.Dispose(); |
|
|
|
} |
|
|
|
catch |
|
|
|
{ |
|
|
|
|