Browse Source

Merge branch 'master' of github.com:Squidex/squidex

pull/528/head
Sebastian 6 years ago
parent
commit
16994ccb29
  1. 15
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs
  2. 73
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs
  3. 4
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs

15
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs

@ -25,5 +25,20 @@ namespace Squidex.Extensions.Actions.Kafka
[DataType(DataType.Text)]
[Formattable]
public string TopicName { get; set; }
[Display(Name = "Payload (Optional)", Description = "Leave it empty to use the full event as body.")]
[DataType(DataType.MultilineText)]
[Formattable]
public string Payload { get; set; }
[Display(Name = "Key", Description = "The message key, commonly used for partitioning.")]
[DataType(DataType.Text)]
[Formattable]
public string Key { get; set; }
[Display(Name = "Headers (Optional)", Description = "The message headers in the format '[Key]=[Value]', one entry per line.")]
[DataType(DataType.MultilineText)]
[Formattable]
public string Headers { get; set; }
}
}

73
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs

@ -6,8 +6,11 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.Rules.EnrichedEvents;
@ -26,21 +29,83 @@ namespace Squidex.Extensions.Actions.Kafka
protected override (string Description, KafkaJob Data) CreateJob(EnrichedEvent @event, KafkaAction action)
{
string value, key;
if (!string.IsNullOrEmpty(action.Payload))
{
value = Format(action.Payload, @event);
}
else
{
value = ToEnvelopeJson(@event);
}
if (!string.IsNullOrEmpty(action.Key))
{
key = Format(action.Key, @event);
}
else
{
key = @event.Name;
}
var ruleJob = new KafkaJob
{
TopicName = action.TopicName,
MessageKey = @event.Name,
MessageValue = ToEnvelopeJson(@event)
MessageKey = key,
MessageValue = value,
Headers = ParseHeaders(action.Headers, @event)
};
return (Description, ruleJob);
}
private Dictionary<string, string> ParseHeaders(string headers, EnrichedEvent @event)
{
if (string.IsNullOrWhiteSpace(headers))
{
return null;
}
var headersDictionary = new Dictionary<string, string>();
var lines = headers.Split('\n');
foreach (var line in lines)
{
var indexEqual = line.IndexOf('=');
if (indexEqual > 0 && indexEqual < line.Length - 1)
{
var key = line.Substring(0, indexEqual);
var val = line.Substring(indexEqual + 1);
val = Format(val, @event);
headersDictionary[key] = val;
}
}
return headersDictionary;
}
protected override async Task<Result> ExecuteJobAsync(KafkaJob job, CancellationToken ct = default)
{
try
{
await kafkaProducer.Send(job.TopicName, job.MessageKey, job.MessageValue);
var message = new Message<string, string> { Key = job.MessageKey, Value = job.MessageValue };
if (job.Headers?.Count > 0)
{
message.Headers = new Headers();
foreach (var header in job.Headers)
{
message.Headers.Add(header.Key, Encoding.UTF8.GetBytes(header.Value));
}
}
await kafkaProducer.Send(job.TopicName, message);
return Result.Success($"Event pushed to {job.TopicName} kafka topic.");
}
@ -58,5 +123,7 @@ namespace Squidex.Extensions.Actions.Kafka
public string MessageKey { get; set; }
public string MessageValue { get; set; }
public Dictionary<string, string> Headers { get; set; }
}
}

4
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs

@ -77,10 +77,8 @@ namespace Squidex.Extensions.Actions.Kafka
.WriteProperty("reason", error.Reason));
}
public async Task<DeliveryResult<string, string>> Send(string topicName, string key, string value)
public async Task<DeliveryResult<string, string>> Send(string topicName, Message<string, string> message)
{
var message = new Message<string, string> { Key = key, Value = value };
return await producer.ProduceAsync(topicName, message);
}

Loading…
Cancel
Save