Browse Source

ICIS-VEGA : FOSS-9 - Added support for nullable fields of type union in Kafka. Enhanced Kafka rule success message with message key. (#677)

Co-authored-by: Mittul Madaan <mittul.madaan@reedelsevier.com>
pull/679/head
Sebastian Stehle 5 years ago
committed by GitHub
parent
commit
4ed68373a0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs
  2. 71
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs

2
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs

@ -96,7 +96,7 @@ namespace Squidex.Extensions.Actions.Kafka
{
await kafkaProducer.SendAsync(job, ct);
return Result.Success($"Event pushed to {job.TopicName} kafka topic.");
return Result.Success($"Event pushed to {job.TopicName} kafka topic with {job.MessageKey} message key.");
}
catch (Exception ex)
{

71
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs

@ -184,7 +184,7 @@ namespace Squidex.Extensions.Actions.Kafka
{
switch (value)
{
case JsonString s:
case JsonString s when IsTypeOrUnionWith(schema, Schema.Type.String):
return s.Value;
case JsonNumber n when IsTypeOrUnionWith(schema, Schema.Type.Long):
return (long)n.Value;
@ -192,44 +192,87 @@ namespace Squidex.Extensions.Actions.Kafka
return (float)n.Value;
case JsonNumber n when IsTypeOrUnionWith(schema, Schema.Type.Int):
return (int)n.Value;
case JsonNumber n:
case JsonNumber n when IsTypeOrUnionWith(schema, Schema.Type.Double):
return n.Value;
case JsonBoolean b:
case JsonBoolean b when IsTypeOrUnionWith(schema, Schema.Type.Boolean):
return b.Value;
case JsonObject o when (schema is MapSchema map):
case JsonObject o when IsTypeOrUnionWith(schema, Schema.Type.Map):
{
var mapResult = new Dictionary<string, object>();
foreach (var (key, childValue) in o)
if (schema is UnionSchema union)
{
mapResult.Add(key, GetValue(childValue, map.ValueSchema));
var map = (MapSchema)union.Schemas.FirstOrDefault(x => x.Tag == Schema.Type.Map);
foreach (var (key, childValue) in o)
{
mapResult.Add(key, GetValue(childValue, map?.ValueSchema));
}
}
else if (schema is MapSchema map)
{
foreach (var (key, childValue) in o)
{
mapResult.Add(key, GetValue(childValue, map?.ValueSchema));
}
}
return mapResult;
}
case JsonObject o when (schema is RecordSchema record):
case JsonObject o when IsTypeOrUnionWith(schema, Schema.Type.Record):
{
var result = new GenericRecord(record);
GenericRecord result = null;
foreach (var (key, childValue) in o)
if (schema is UnionSchema union)
{
if (record.TryGetField(key, out var field))
var record = (RecordSchema)union.Schemas.FirstOrDefault(x => x.Tag == Schema.Type.Record);
result = new GenericRecord(record);
foreach (var (key, childValue) in o)
{
result.Add(key, GetValue(childValue, field.Schema));
if (record != null && record.TryGetField(key, out var field))
{
result.Add(key, GetValue(childValue, field.Schema));
}
}
}
else if (schema is RecordSchema record)
{
result = new GenericRecord(record);
foreach (var (key, childValue) in o)
{
if (record.TryGetField(key, out var field))
{
result.Add(key, GetValue(childValue, field.Schema));
}
}
}
return result;
}
case JsonArray a when (schema is ArraySchema array):
case JsonArray a when IsTypeOrUnionWith(schema, Schema.Type.Array):
{
var result = new List<object>();
foreach (var item in a)
if (schema is UnionSchema union)
{
var arraySchema = (ArraySchema)union.Schemas.FirstOrDefault(x => x.Tag == Schema.Type.Array);
foreach (var item in a)
{
result.Add(GetValue(item, arraySchema?.ItemSchema));
}
}
else if (schema is ArraySchema array)
{
result.Add(GetValue(item, array.ItemSchema));
foreach (var item in a)
{
result.Add(GetValue(item, array.ItemSchema));
}
}
return result.ToArray();

Loading…
Cancel
Save