Browse Source

Kafka partitioning support (#530)

* Kafka rule extension. Added payload, key and headers for JSON.

* Kafka avro support.

* Adding support for kafka custom partitioning.

Co-authored-by: Mittul Madaan <mittul.madaan@reedelsevier.com>
pull/531/head
Mittul Madaan 6 years ago
committed by GitHub
parent
commit
657d5f1afb
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs
  2. 24
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs
  3. 44
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs

9
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs

@ -36,6 +36,15 @@ namespace Squidex.Extensions.Actions.Kafka
[Formattable]
public string Key { get; set; }
[Display(Name = "Partition Key", Description = "The partition key, only used when we don't want to define partiontionig with key.")]
[DataType(DataType.Text)]
[Formattable]
public string PartitionKey { get; set; }
[Display(Name = "Partition Count", Description = "Define the number of partitions for specific topic.")]
[DataType(DataType.Text)]
public int PartitionCount { get; set; }
[Display(Name = "Headers (Optional)", Description = "The message headers in the format '[Key]=[Value]', one entry per line.")]
[DataType(DataType.MultilineText)]
[Formattable]

24
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs

@ -7,10 +7,8 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.Rules.EnrichedEvents;
@ -55,7 +53,9 @@ namespace Squidex.Extensions.Actions.Kafka
MessageKey = key,
MessageValue = value,
Headers = await ParseHeadersAsync(action.Headers, @event),
Schema = action.Schema
Schema = action.Schema,
PartitionKey = await FormatAsync(action.PartitionKey, @event),
PartitionCount = action.PartitionCount
};
return (Description, ruleJob);
@ -94,19 +94,7 @@ namespace Squidex.Extensions.Actions.Kafka
{
try
{
var message = new Message<string, string> { Key = job.MessageKey, Value = job.MessageValue };
if (job.Headers?.Count > 0)
{
message.Headers = new Headers();
foreach (var header in job.Headers)
{
message.Headers.Add(header.Key, Encoding.UTF8.GetBytes(header.Value));
}
}
await kafkaProducer.Send(job.TopicName, message, job.Schema);
await kafkaProducer.SendAsync(job);
return Result.Success($"Event pushed to {job.TopicName} kafka topic.");
}
@ -128,5 +116,9 @@ namespace Squidex.Extensions.Actions.Kafka
public Dictionary<string, string> Headers { get; set; }
public string Schema { get; set; }
public string PartitionKey { get; set; }
public int PartitionCount { get; set; }
}
}

44
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs

@ -5,7 +5,9 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Avro;
using Avro.Generic;
@ -107,18 +109,48 @@ namespace Squidex.Extensions.Actions.Kafka
.WriteProperty("reason", error.Reason));
}
public async Task<DeliveryResult<string, string>> Send(string topicName, Message<string, string> message, string schema)
public async Task SendAsync(KafkaJob job)
{
if (!string.IsNullOrWhiteSpace(schema))
if (!string.IsNullOrWhiteSpace(job.Schema))
{
var value = CreateAvroRecord(message.Value, schema);
var value = CreateAvroRecord(job.MessageValue, job.Schema);
var avroMessage = new Message<string, GenericRecord> { Key = message.Key, Headers = message.Headers, Value = value };
var message = new Message<string, GenericRecord> { Value = value };
await avroProducer.ProduceAsync(topicName, avroMessage);
await ProduceAsync(avroProducer, message, job);
}
else
{
var message = new Message<string, string> { Value = job.MessageValue };
await ProduceAsync(textProducer, message, job);
}
}
private async Task ProduceAsync<T>(IProducer<string, T> producer, Message<string, T> message, KafkaJob job)
{
message.Key = job.MessageKey;
if (job.Headers?.Count > 0)
{
message.Headers = new Headers();
return await textProducer.ProduceAsync(topicName, message);
foreach (var header in job.Headers)
{
message.Headers.Add(header.Key, Encoding.UTF8.GetBytes(header.Value));
}
}
if (!string.IsNullOrWhiteSpace(job.PartitionKey) && job.PartitionCount > 0)
{
var partition = Math.Abs(job.PartitionKey.GetHashCode()) % job.PartitionCount;
await producer.ProduceAsync(new TopicPartition(job.TopicName, partition), message);
}
else
{
await producer.ProduceAsync(job.TopicName, message);
}
}
private GenericRecord CreateAvroRecord(string json, string avroSchema)

Loading…
Cancel
Save