|
|
|
@ -15,9 +15,11 @@ using Confluent.Kafka; |
|
|
|
using Confluent.SchemaRegistry; |
|
|
|
using Confluent.SchemaRegistry.Serdes; |
|
|
|
using Microsoft.Extensions.Options; |
|
|
|
using Newtonsoft.Json; |
|
|
|
using Squidex.Infrastructure.Json; |
|
|
|
using Squidex.Infrastructure.Json.Objects; |
|
|
|
using Squidex.Infrastructure.Log; |
|
|
|
using Schema = Avro.Schema; |
|
|
|
|
|
|
|
namespace Squidex.Extensions.Actions.Kafka |
|
|
|
{ |
|
|
|
@ -155,13 +157,20 @@ namespace Squidex.Extensions.Actions.Kafka |
|
|
|
|
|
|
|
private GenericRecord CreateAvroRecord(string json, string avroSchema) |
|
|
|
{ |
|
|
|
var schema = (RecordSchema)Avro.Schema.Parse(avroSchema); |
|
|
|
try |
|
|
|
{ |
|
|
|
var schema = (RecordSchema)Avro.Schema.Parse(avroSchema); |
|
|
|
|
|
|
|
var jsonObject = jsonSerializer.Deserialize<JsonObject>(json); |
|
|
|
var jsonObject = jsonSerializer.Deserialize<JsonObject>(json); |
|
|
|
|
|
|
|
var result = (GenericRecord)GetValue(jsonObject, schema); |
|
|
|
var result = (GenericRecord)GetValue(jsonObject, schema); |
|
|
|
|
|
|
|
return result; |
|
|
|
return result; |
|
|
|
} |
|
|
|
catch (JsonException ex) |
|
|
|
{ |
|
|
|
throw new InvalidOperationException($"Failed to parse json: {json}, got {ex.Message}", ex); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void Dispose() |
|
|
|
@ -170,12 +179,18 @@ namespace Squidex.Extensions.Actions.Kafka |
|
|
|
avroProducer?.Dispose(); |
|
|
|
} |
|
|
|
|
|
|
|
private object GetValue(IJsonValue value, Avro.Schema schema) |
|
|
|
private object GetValue(IJsonValue value, Schema schema) |
|
|
|
{ |
|
|
|
switch (value) |
|
|
|
{ |
|
|
|
case JsonString s: |
|
|
|
return s.Value; |
|
|
|
case JsonNumber n when schema.Tag == Schema.Type.Long: |
|
|
|
return (long)n.Value; |
|
|
|
case JsonNumber n when schema.Tag == Schema.Type.Float: |
|
|
|
return (float)n.Value; |
|
|
|
case JsonNumber n when schema.Tag == Schema.Type.Int: |
|
|
|
return (int)n.Value; |
|
|
|
case JsonNumber n: |
|
|
|
return n.Value; |
|
|
|
case JsonBoolean b: |
|
|
|
|