From 4ed68373a0c6796ea9ac54225e0e792f0eaa2471 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Tue, 23 Mar 2021 19:54:22 +0100 Subject: [PATCH] ICIS-VEGA : FOSS-9 - Added support for nullable fields of type union in Kafka. Enhanced Kafka rule success message with message key. (#677) Co-authored-by: Mittul Madaan --- .../Actions/Kafka/KafkaActionHandler.cs | 2 +- .../Actions/Kafka/KafkaProducer.cs | 71 +++++++++++++++---- 2 files changed, 58 insertions(+), 15 deletions(-) diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs index f02e978c4..53a9bb293 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs @@ -96,7 +96,7 @@ namespace Squidex.Extensions.Actions.Kafka { await kafkaProducer.SendAsync(job, ct); - return Result.Success($"Event pushed to {job.TopicName} kafka topic."); + return Result.Success($"Event pushed to {job.TopicName} kafka topic with {job.MessageKey} message key."); } catch (Exception ex) { diff --git a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs index 9529c2df1..b951a7937 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs @@ -184,7 +184,7 @@ namespace Squidex.Extensions.Actions.Kafka { switch (value) { - case JsonString s: + case JsonString s when IsTypeOrUnionWith(schema, Schema.Type.String): return s.Value; case JsonNumber n when IsTypeOrUnionWith(schema, Schema.Type.Long): return (long)n.Value; @@ -192,44 +192,87 @@ namespace Squidex.Extensions.Actions.Kafka return (float)n.Value; case JsonNumber n when IsTypeOrUnionWith(schema, Schema.Type.Int): return (int)n.Value; - case JsonNumber n: + case JsonNumber n when IsTypeOrUnionWith(schema, Schema.Type.Double): return n.Value; - case JsonBoolean b: + case JsonBoolean b when IsTypeOrUnionWith(schema, Schema.Type.Boolean): return b.Value; - case JsonObject o when (schema is MapSchema map): + case JsonObject o when IsTypeOrUnionWith(schema, Schema.Type.Map): { var mapResult = new Dictionary(); - foreach (var (key, childValue) in o) + if (schema is UnionSchema union) { - mapResult.Add(key, GetValue(childValue, map.ValueSchema)); + var map = (MapSchema)union.Schemas.FirstOrDefault(x => x.Tag == Schema.Type.Map); + + foreach (var (key, childValue) in o) + { + mapResult.Add(key, GetValue(childValue, map?.ValueSchema)); + } + } + else if (schema is MapSchema map) + { + foreach (var (key, childValue) in o) + { + mapResult.Add(key, GetValue(childValue, map?.ValueSchema)); + } } return mapResult; } - case JsonObject o when (schema is RecordSchema record): + case JsonObject o when IsTypeOrUnionWith(schema, Schema.Type.Record): { - var result = new GenericRecord(record); + GenericRecord result = null; - foreach (var (key, childValue) in o) + if (schema is UnionSchema union) { - if (record.TryGetField(key, out var field)) + var record = (RecordSchema)union.Schemas.FirstOrDefault(x => x.Tag == Schema.Type.Record); + + result = new GenericRecord(record); + + foreach (var (key, childValue) in o) { - result.Add(key, GetValue(childValue, field.Schema)); + if (record != null && record.TryGetField(key, out var field)) + { + result.Add(key, GetValue(childValue, field.Schema)); + } + } + } + else if (schema is RecordSchema record) + { + result = new GenericRecord(record); + + foreach (var (key, childValue) in o) + { + if (record.TryGetField(key, out var field)) + { + result.Add(key, GetValue(childValue, field.Schema)); + } } } return result; } - case JsonArray a when (schema is ArraySchema array): + case JsonArray a when IsTypeOrUnionWith(schema, Schema.Type.Array): { var result = new List(); - foreach (var item in a) + if (schema is UnionSchema union) + { + var arraySchema = (ArraySchema)union.Schemas.FirstOrDefault(x => x.Tag == Schema.Type.Array); + + foreach (var item in a) + { + result.Add(GetValue(item, arraySchema?.ItemSchema)); + } + } + else if (schema is ArraySchema array) { - result.Add(GetValue(item, array.ItemSchema)); + foreach (var item in a) + { + result.Add(GetValue(item, array.ItemSchema)); + } } return result.ToArray();