diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs index d79a39032..2979c23a4 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs @@ -94,7 +94,7 @@ namespace Squidex.Extensions.Actions.Kafka { try { - await kafkaProducer.SendAsync(job); + await kafkaProducer.SendAsync(job, ct); return Result.Success($"Event pushed to {job.TopicName} kafka topic."); } diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs index ca47ae46f..6085824ea 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; using System.Text; +using System.Threading; using System.Threading.Tasks; using Avro; using Avro.Generic; @@ -111,7 +112,7 @@ namespace Squidex.Extensions.Actions.Kafka .WriteProperty("reason", error.Reason)); } - public async Task SendAsync(KafkaJob job) + public async Task SendAsync(KafkaJob job, CancellationToken ct) { if (!string.IsNullOrWhiteSpace(job.Schema)) { @@ -119,17 +120,17 @@ namespace Squidex.Extensions.Actions.Kafka var message = new Message { Value = value }; - await ProduceAsync(avroProducer, message, job); + await ProduceAsync(avroProducer, message, job, ct); } else { var message = new Message { Value = job.MessageValue }; - await ProduceAsync(textProducer, message, job); + await ProduceAsync(textProducer, message, job, ct); } } - private async Task ProduceAsync(IProducer producer, Message message, KafkaJob job) + private async Task ProduceAsync(IProducer producer, Message message, KafkaJob job, CancellationToken ct) { message.Key = job.MessageKey; @@ -147,11 +148,11 @@ namespace Squidex.Extensions.Actions.Kafka { var partition = Math.Abs(job.PartitionKey.GetHashCode()) % job.PartitionCount; - await producer.ProduceAsync(new TopicPartition(job.TopicName, partition), message); + await producer.ProduceAsync(new TopicPartition(job.TopicName, partition), message, ct); } else { - await producer.ProduceAsync(job.TopicName, message); + await producer.ProduceAsync(job.TopicName, message, ct); } }