diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs index e1866bd99..1bf220cad 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs @@ -25,5 +25,20 @@ namespace Squidex.Extensions.Actions.Kafka [DataType(DataType.Text)] [Formattable] public string TopicName { get; set; } + + [Display(Name = "Payload (Optional)", Description = "Leave it empty to use the full event as body.")] + [DataType(DataType.MultilineText)] + [Formattable] + public string Payload { get; set; } + + [Display(Name = "Key", Description = "The message key, commonly used for partitioning.")] + [DataType(DataType.Text)] + [Formattable] + public string Key { get; set; } + + [Display(Name = "Headers (Optional)", Description = "The message headers in the format '[Key]=[Value]', one entry per line.")] + [DataType(DataType.MultilineText)] + [Formattable] + public string Headers { get; set; } } } diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs index 28d01719a..e7e248334 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs @@ -6,8 +6,11 @@ // ========================================================================== using System; +using System.Collections.Generic; +using System.Text; using System.Threading; using System.Threading.Tasks; +using Confluent.Kafka; using Squidex.Domain.Apps.Core.HandleRules; using Squidex.Domain.Apps.Core.Rules.EnrichedEvents; @@ -26,21 +29,83 @@ namespace Squidex.Extensions.Actions.Kafka protected override (string Description, KafkaJob Data) CreateJob(EnrichedEvent @event, KafkaAction action) { + string value, key; + + if (!string.IsNullOrEmpty(action.Payload)) + { + value = Format(action.Payload, @event); + } + else + { + value = ToEnvelopeJson(@event); + } + + if (!string.IsNullOrEmpty(action.Key)) + { + key = Format(action.Key, @event); + } + else + { + key = @event.Name; + } + var ruleJob = new KafkaJob { TopicName = action.TopicName, - MessageKey = @event.Name, - MessageValue = ToEnvelopeJson(@event) + MessageKey = key, + MessageValue = value, + Headers = ParseHeaders(action.Headers, @event) }; return (Description, ruleJob); } + private Dictionary ParseHeaders(string headers, EnrichedEvent @event) + { + if (string.IsNullOrWhiteSpace(headers)) + { + return null; + } + + var headersDictionary = new Dictionary(); + + var lines = headers.Split('\n'); + + foreach (var line in lines) + { + var indexEqual = line.IndexOf('='); + + if (indexEqual > 0 && indexEqual < line.Length - 1) + { + var key = line.Substring(0, indexEqual); + var val = line.Substring(indexEqual + 1); + + val = Format(val, @event); + + headersDictionary[key] = val; + } + } + + return headersDictionary; + } + protected override async Task ExecuteJobAsync(KafkaJob job, CancellationToken ct = default) { try { - await kafkaProducer.Send(job.TopicName, job.MessageKey, job.MessageValue); + var message = new Message { Key = job.MessageKey, Value = job.MessageValue }; + + if (job.Headers?.Count > 0) + { + message.Headers = new Headers(); + + foreach (var header in job.Headers) + { + message.Headers.Add(header.Key, Encoding.UTF8.GetBytes(header.Value)); + } + } + + await kafkaProducer.Send(job.TopicName, message); return Result.Success($"Event pushed to {job.TopicName} kafka topic."); } @@ -58,5 +123,7 @@ namespace Squidex.Extensions.Actions.Kafka public string MessageKey { get; set; } public string MessageValue { get; set; } + + public Dictionary Headers { get; set; } } } diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs index cc74e2b90..7f34fd2e3 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs @@ -77,10 +77,8 @@ namespace Squidex.Extensions.Actions.Kafka .WriteProperty("reason", error.Reason)); } - public async Task> Send(string topicName, string key, string value) + public async Task> Send(string topicName, Message message) { - var message = new Message { Key = key, Value = value }; - return await producer.ProduceAsync(topicName, message); }