diff --git a/framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj b/framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj
index f66761fc2a..e56bd38916 100644
--- a/framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj
+++ b/framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj
@@ -5,6 +5,8 @@
netstandard2.0;netstandard2.1;net7.0
+ enable
+ Nullable
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs
index b48a952615..11b443f72b 100644
--- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs
@@ -8,11 +8,11 @@ public class AbpKafkaOptions
{
public KafkaConnections Connections { get; }
- public Action ConfigureProducer { get; set; }
+ public Action? ConfigureProducer { get; set; }
- public Action ConfigureConsumer { get; set; }
+ public Action? ConfigureConsumer { get; set; }
- public Action ConfigureTopic { get; set; }
+ public Action? ConfigureTopic { get; set; }
public AbpKafkaOptions()
{
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs
index 7c73df0d51..48943c84c4 100644
--- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs
@@ -30,7 +30,7 @@ public class ConsumerPool : IConsumerPool, ISingletonDependency
Logger = new NullLogger();
}
- public virtual IConsumer Get(string groupId, string connectionName = null)
+ public virtual IConsumer Get(string groupId, string? connectionName = null)
{
connectionName ??= KafkaConnections.DefaultConnectionName;
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs
index 4666a0175f..a3f484ef56 100644
--- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs
@@ -5,5 +5,5 @@ namespace Volo.Abp.Kafka;
public interface IConsumerPool : IDisposable
{
- IConsumer Get(string groupId, string connectionName = null);
+ IConsumer Get(string groupId, string? connectionName = null);
}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs
index e21c8c9fd2..985e20625f 100644
--- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs
@@ -14,5 +14,5 @@ public interface IKafkaMessageConsumerFactory
IKafkaMessageConsumer Create(
string topicName,
string groupId,
- string connectionName = null);
+ string? connectionName = null);
}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs
index 8930184016..378075b2ec 100644
--- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs
@@ -5,5 +5,5 @@ namespace Volo.Abp.Kafka;
public interface IProducerPool : IDisposable
{
- IProducer Get(string connectionName = null);
+ IProducer Get(string? connectionName = null);
}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs
index 3d9f2fe950..e66e074ad6 100644
--- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs
@@ -21,8 +21,10 @@ public class KafkaConnections : Dictionary
Default = new ClientConfig();
}
- public ClientConfig GetOrDefault(string connectionName)
+ public ClientConfig GetOrDefault(string? connectionName)
{
+ connectionName ??= DefaultConnectionName;
+
if (TryGetValue(connectionName, out var connectionFactory))
{
return connectionFactory;
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs
index 704c15845f..5a2ba38189 100644
--- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs
@@ -31,13 +31,13 @@ public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency,
protected ConcurrentBag, Task>> Callbacks { get; }
- protected IConsumer Consumer { get; private set; }
+ protected IConsumer? Consumer { get; private set; }
- protected string ConnectionName { get; private set; }
+ protected string? ConnectionName { get; private set; }
- protected string GroupId { get; private set; }
+ protected string GroupId { get; private set; } = default!;
- protected string TopicName { get; private set; }
+ protected string TopicName { get; private set; } = default!;
public KafkaMessageConsumer(
IConsumerPool consumerPool,
@@ -63,7 +63,7 @@ public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency,
public virtual void Initialize(
[NotNull] string topicName,
[NotNull] string groupId,
- string connectionName = null)
+ string? connectionName = null)
{
Check.NotNull(topicName, nameof(topicName));
Check.NotNull(groupId, nameof(groupId));
@@ -160,7 +160,7 @@ public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency,
}
finally
{
- Consumer.Commit(consumeResult);
+ Consumer?.Commit(consumeResult);
}
}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs
index dfc5912d55..fe46b5fe38 100644
--- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs
@@ -16,7 +16,7 @@ public class KafkaMessageConsumerFactory : IKafkaMessageConsumerFactory, ISingle
public IKafkaMessageConsumer Create(
string topicName,
string groupId,
- string connectionName = null)
+ string? connectionName = null)
{
var consumer = ServiceScope.ServiceProvider.GetRequiredService();
consumer.Initialize(topicName, groupId, connectionName);
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs
index ac2edf31a7..23b9b71a57 100644
--- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs
@@ -32,7 +32,7 @@ public class ProducerPool : IProducerPool, ISingletonDependency
Logger = new NullLogger();
}
- public virtual IProducer Get(string connectionName = null)
+ public virtual IProducer Get(string? connectionName = null)
{
connectionName ??= KafkaConnections.DefaultConnectionName;