diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs index 50aae0259..7703544ec 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs @@ -36,6 +36,15 @@ namespace Squidex.Extensions.Actions.Kafka [Formattable] public string Key { get; set; } + [Display(Name = "Partition Key", Description = "The partition key, only used when we don't want to define partiontionig with key.")] + [DataType(DataType.Text)] + [Formattable] + public string PartitionKey { get; set; } + + [Display(Name = "Partition Count", Description = "Define the number of partitions for specific topic.")] + [DataType(DataType.Text)] + public int PartitionCount { get; set; } + [Display(Name = "Headers (Optional)", Description = "The message headers in the format '[Key]=[Value]', one entry per line.")] [DataType(DataType.MultilineText)] [Formattable] diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs index aa7b060fc..d79a39032 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs @@ -7,10 +7,8 @@ using System; using System.Collections.Generic; -using System.Text; using System.Threading; using System.Threading.Tasks; -using Confluent.Kafka; using Squidex.Domain.Apps.Core.HandleRules; using Squidex.Domain.Apps.Core.Rules.EnrichedEvents; @@ -55,7 +53,9 @@ namespace Squidex.Extensions.Actions.Kafka MessageKey = key, MessageValue = value, Headers = await ParseHeadersAsync(action.Headers, @event), - Schema = action.Schema + Schema = action.Schema, + PartitionKey = await FormatAsync(action.PartitionKey, @event), + PartitionCount = action.PartitionCount }; return (Description, ruleJob); @@ -94,19 +94,7 @@ namespace Squidex.Extensions.Actions.Kafka { try { - var message = new Message { Key = job.MessageKey, Value = job.MessageValue }; - - if (job.Headers?.Count > 0) - { - message.Headers = new Headers(); - - foreach (var header in job.Headers) - { - message.Headers.Add(header.Key, Encoding.UTF8.GetBytes(header.Value)); - } - } - - await kafkaProducer.Send(job.TopicName, message, job.Schema); + await kafkaProducer.SendAsync(job); return Result.Success($"Event pushed to {job.TopicName} kafka topic."); } @@ -128,5 +116,9 @@ namespace Squidex.Extensions.Actions.Kafka public Dictionary Headers { get; set; } public string Schema { get; set; } + + public string PartitionKey { get; set; } + + public int PartitionCount { get; set; } } } diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs index ec7d8c5b7..85c2b758f 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs @@ -5,7 +5,9 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System; using System.Collections.Generic; +using System.Text; using System.Threading.Tasks; using Avro; using Avro.Generic; @@ -107,18 +109,48 @@ namespace Squidex.Extensions.Actions.Kafka .WriteProperty("reason", error.Reason)); } - public async Task> Send(string topicName, Message message, string schema) + public async Task SendAsync(KafkaJob job) { - if (!string.IsNullOrWhiteSpace(schema)) + if (!string.IsNullOrWhiteSpace(job.Schema)) { - var value = CreateAvroRecord(message.Value, schema); + var value = CreateAvroRecord(job.MessageValue, job.Schema); - var avroMessage = new Message { Key = message.Key, Headers = message.Headers, Value = value }; + var message = new Message { Value = value }; - await avroProducer.ProduceAsync(topicName, avroMessage); + await ProduceAsync(avroProducer, message, job); } + else + { + var message = new Message { Value = job.MessageValue }; + + await ProduceAsync(textProducer, message, job); + } + } + + private async Task ProduceAsync(IProducer producer, Message message, KafkaJob job) + { + message.Key = job.MessageKey; + + if (job.Headers?.Count > 0) + { + message.Headers = new Headers(); - return await textProducer.ProduceAsync(topicName, message); + foreach (var header in job.Headers) + { + message.Headers.Add(header.Key, Encoding.UTF8.GetBytes(header.Value)); + } + } + + if (!string.IsNullOrWhiteSpace(job.PartitionKey) && job.PartitionCount > 0) + { + var partition = Math.Abs(job.PartitionKey.GetHashCode()) % job.PartitionCount; + + await producer.ProduceAsync(new TopicPartition(job.TopicName, partition), message); + } + else + { + await producer.ProduceAsync(job.TopicName, message); + } } private GenericRecord CreateAvroRecord(string json, string avroSchema)