|
|
|
@ -31,13 +31,13 @@ public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency, |
|
|
|
|
|
|
|
protected ConcurrentBag<Func<Message<string, byte[]>, Task>> Callbacks { get; } |
|
|
|
|
|
|
|
protected IConsumer<string, byte[]> Consumer { get; private set; } |
|
|
|
protected IConsumer<string, byte[]>? Consumer { get; private set; } |
|
|
|
|
|
|
|
protected string ConnectionName { get; private set; } |
|
|
|
protected string? ConnectionName { get; private set; } |
|
|
|
|
|
|
|
protected string GroupId { get; private set; } |
|
|
|
protected string GroupId { get; private set; } = default!; |
|
|
|
|
|
|
|
protected string TopicName { get; private set; } |
|
|
|
protected string TopicName { get; private set; } = default!; |
|
|
|
|
|
|
|
public KafkaMessageConsumer( |
|
|
|
IConsumerPool consumerPool, |
|
|
|
@ -63,7 +63,7 @@ public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency, |
|
|
|
public virtual void Initialize( |
|
|
|
[NotNull] string topicName, |
|
|
|
[NotNull] string groupId, |
|
|
|
string connectionName = null) |
|
|
|
string? connectionName = null) |
|
|
|
{ |
|
|
|
Check.NotNull(topicName, nameof(topicName)); |
|
|
|
Check.NotNull(groupId, nameof(groupId)); |
|
|
|
@ -160,7 +160,7 @@ public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency, |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
Consumer.Commit(consumeResult); |
|
|
|
Consumer?.Commit(consumeResult); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|