diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs index 1bf220cad..50aae0259 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs @@ -40,5 +40,9 @@ namespace Squidex.Extensions.Actions.Kafka [DataType(DataType.MultilineText)] [Formattable] public string Headers { get; set; } + + [Display(Name = "Schema (Optional)", Description = "Define a specific AVRO schema in JSON format.")] + [DataType(DataType.MultilineText)] + public string Schema { get; set; } } } diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs index e7e248334..dd5964e83 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs @@ -54,7 +54,8 @@ namespace Squidex.Extensions.Actions.Kafka TopicName = action.TopicName, MessageKey = key, MessageValue = value, - Headers = ParseHeaders(action.Headers, @event) + Headers = ParseHeaders(action.Headers, @event), + Schema = action.Schema }; return (Description, ruleJob); @@ -105,13 +106,13 @@ namespace Squidex.Extensions.Actions.Kafka } } - await kafkaProducer.Send(job.TopicName, message); + await kafkaProducer.Send(job.TopicName, message, job.Schema); return Result.Success($"Event pushed to {job.TopicName} kafka topic."); } catch (Exception ex) { - return Result.Failed(ex, "Push to Kafka failed."); + return Result.Failed(ex, $"Push to Kafka failed: {ex}"); } } } @@ -125,5 +126,7 @@ namespace Squidex.Extensions.Actions.Kafka public string MessageValue { get; set; } public Dictionary Headers { get; set; } + + public string Schema { get; set; } } } diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs index 7f34fd2e3..a8f2ecfaa 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs @@ -5,20 +5,34 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System; +using System.Collections.Generic; using System.Threading.Tasks; +using Avro; +using Avro.Generic; using Confluent.Kafka; +using Confluent.SchemaRegistry; +using Confluent.SchemaRegistry.Serdes; +using GraphQL.Types; using Microsoft.Extensions.Options; +using Squidex.Infrastructure.Json; +using Squidex.Infrastructure.Json.Objects; using Squidex.Infrastructure.Log; namespace Squidex.Extensions.Actions.Kafka { public sealed class KafkaProducer { - private readonly IProducer producer; + private readonly IProducer textProducer; + private readonly IProducer avroProducer; + private readonly ISchemaRegistryClient schemaRegistry; + private readonly IJsonSerializer jsonSerializer; - public KafkaProducer(IOptions options, ISemanticLog log) + public KafkaProducer(IOptions options, ISemanticLog log, IJsonSerializer jsonSerializer) { - producer = new ProducerBuilder(options.Value) + this.jsonSerializer = jsonSerializer; + + textProducer = new ProducerBuilder(options.Value) .SetErrorHandler((p, error) => { LogError(log, error); @@ -30,6 +44,24 @@ namespace Squidex.Extensions.Actions.Kafka .SetKeySerializer(Serializers.Utf8) .SetValueSerializer(Serializers.Utf8) .Build(); + + if (options.Value.IsSchemaRegistryConfigured()) + { + schemaRegistry = new CachedSchemaRegistryClient(options.Value.SchemaRegistry); + + avroProducer = new ProducerBuilder(options.Value) + .SetErrorHandler((p, error) => + { + LogError(log, error); + }) + .SetLogHandler((p, message) => + { + LogMessage(log, message); + }) + .SetKeySerializer(Serializers.Utf8) + .SetValueSerializer(new AvroSerializer(schemaRegistry, options.Value.AvroSerializer)) + .Build(); + } } private static void LogMessage(ISemanticLog log, LogMessage message) @@ -77,14 +109,80 @@ namespace Squidex.Extensions.Actions.Kafka .WriteProperty("reason", error.Reason)); } - public async Task> Send(string topicName, Message message) + public async Task> Send(string topicName, Message message, string schema) { - return await producer.ProduceAsync(topicName, message); + if (!string.IsNullOrWhiteSpace(schema)) + { + var value = CreateAvroRecord(message.Value, schema); + + var avroMessage = new Message { Key = message.Key, Headers = message.Headers, Value = value }; + + await avroProducer.ProduceAsync(topicName, avroMessage); + } + + return await textProducer.ProduceAsync(topicName, message); + } + + private GenericRecord CreateAvroRecord(string json, string avroSchema) + { + var schema = (RecordSchema)Avro.Schema.Parse(avroSchema); + + var jsonObject = jsonSerializer.Deserialize(json); + + var result = (GenericRecord)GetValue(jsonObject, schema); + + return result; } public void Dispose() { - producer?.Dispose(); + textProducer?.Dispose(); + avroProducer?.Dispose(); + } + + private object GetValue(IJsonValue value, Avro.Schema schema) + { + switch (value) + { + case JsonString s: + return s.Value; + case JsonNumber n: + return n.Value; + case JsonBoolean b: + return b.Value; + case JsonObject o: + { + var recordSchema = (RecordSchema)schema; + + var result = new GenericRecord(recordSchema); + + foreach (var (key, childValue) in o) + { + if (recordSchema.TryGetField(key, out var field)) + { + result.Add(key, GetValue(childValue, field.Schema)); + } + } + + return result; + } + + case JsonArray a: + { + var arraySchema = (ArraySchema)schema; + + var result = new List(); + + foreach (var item in a) + { + result.Add(GetValue(item, arraySchema.ItemSchema)); + } + + return result.ToArray(); + } + } + + return null; } } } diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs index e1fd7b7fc..6a85799d2 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs @@ -6,14 +6,25 @@ // ========================================================================== using Confluent.Kafka; +using Confluent.SchemaRegistry; +using Confluent.SchemaRegistry.Serdes; namespace Squidex.Extensions.Actions.Kafka { public class KafkaProducerOptions : ProducerConfig { + public SchemaRegistryConfig SchemaRegistry { get; set; } + + public AvroSerializerConfig AvroSerializer { get; set; } + public bool IsProducerConfigured() { return !string.IsNullOrWhiteSpace(BootstrapServers); } + + public bool IsSchemaRegistryConfigured() + { + return !string.IsNullOrWhiteSpace(SchemaRegistry?.Url); + } } } diff --git a/backend/extensions/Squidex.Extensions/Squidex.Extensions.csproj b/backend/extensions/Squidex.Extensions/Squidex.Extensions.csproj index 1f57f09ce..69b9b94df 100644 --- a/backend/extensions/Squidex.Extensions/Squidex.Extensions.csproj +++ b/backend/extensions/Squidex.Extensions/Squidex.Extensions.csproj @@ -9,7 +9,9 @@ + +