From d17c4464b77b3809beccc4ee20cf447b3d8f502f Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sun, 18 Aug 2019 18:19:11 +0200 Subject: [PATCH] Kafka (#401) * Added kafka Extension which can push events to Kafka Topics. * review comments * kafka tested --- Squidex.sln | 2 +- .../Actions/Kafka/KafkaAction.cs | 28 ++++++ .../Actions/Kafka/KafkaActionHandler.cs | 62 +++++++++++++ .../Actions/Kafka/KafkaPlugin.cs | 30 ++++++ .../Actions/Kafka/KafkaProducer.cs | 93 +++++++++++++++++++ .../Actions/Kafka/KafkaProducerOptions.cs | 19 ++++ .../Squidex.Extensions.csproj | 1 + src/Squidex/appsettings.json | 31 ++++--- 8 files changed, 253 insertions(+), 13 deletions(-) create mode 100644 extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs create mode 100644 extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs create mode 100644 extensions/Squidex.Extensions/Actions/Kafka/KafkaPlugin.cs create mode 100644 extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs create mode 100644 extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs diff --git a/Squidex.sln b/Squidex.sln index 9353d4689..a5953a8b1 100644 --- a/Squidex.sln +++ b/Squidex.sln @@ -1,6 +1,6 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 16 -VisualStudioVersion = 16.0.29020.237 +VisualStudioVersion = 16.0.29123.88 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex", "src\Squidex\Squidex.csproj", "{61F6BBCE-A080-4400-B194-70E2F5D2096E}" EndProject diff --git a/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs b/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs new file mode 100644 index 000000000..a10cc5a0d --- /dev/null +++ b/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs @@ -0,0 +1,28 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.ComponentModel.DataAnnotations; +using Squidex.Domain.Apps.Core.HandleRules; +using Squidex.Domain.Apps.Core.Rules; + +namespace Squidex.Extensions.Actions.Kafka +{ + [RuleAction( + IconImage = "", + IconColor = "#404244", + Display = "Push to kafka", + Description = "Connect to Kafka stream and push data to that stream.", + ReadMore = "https://kafka.apache.org/quickstart")] + public sealed class KafkaAction : RuleAction + { + [Required] + [Display(Name = "Topic Name", Description = "The name of the topic.")] + [DataType(DataType.Text)] + [Formattable] + public string TopicName { get; set; } + } +} diff --git a/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs b/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs new file mode 100644 index 000000000..131917372 --- /dev/null +++ b/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs @@ -0,0 +1,62 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Threading; +using System.Threading.Tasks; +using Squidex.Domain.Apps.Core.HandleRules; +using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents; + +namespace Squidex.Extensions.Actions.Kafka +{ + public sealed class KafkaActionHandler : RuleActionHandler + { + private const string Description = "Push to Kafka"; + private readonly KafkaProducer kafkaProducer; + + public KafkaActionHandler(RuleEventFormatter formatter, KafkaProducer kafkaProducer) + : base(formatter) + { + this.kafkaProducer = kafkaProducer; + } + + protected override (string Description, KafkaJob Data) CreateJob(EnrichedEvent @event, KafkaAction action) + { + var ruleJob = new KafkaJob + { + TopicName = action.TopicName, + MessageKey = @event.Name, + MessageValue = ToEnvelopeJson(@event) + }; + + return (Description, ruleJob); + } + + protected override async Task ExecuteJobAsync(KafkaJob job, CancellationToken ct) + { + try + { + await kafkaProducer.Send(job.TopicName, job.MessageKey, job.MessageValue); + + return Result.Success($"Event pushed to {job.TopicName} kafka topic."); + } + catch (Exception ex) + { + return Result.Failed(ex, "Push to Kafka failed."); + } + } + } + + public sealed class KafkaJob + { + public string TopicName { get; set; } + + public string MessageKey { get; set; } + + public string MessageValue { get; set; } + } +} diff --git a/extensions/Squidex.Extensions/Actions/Kafka/KafkaPlugin.cs b/extensions/Squidex.Extensions/Actions/Kafka/KafkaPlugin.cs new file mode 100644 index 000000000..400c0662f --- /dev/null +++ b/extensions/Squidex.Extensions/Actions/Kafka/KafkaPlugin.cs @@ -0,0 +1,30 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Squidex.Infrastructure.Plugins; + +namespace Squidex.Extensions.Actions.Kafka +{ + public sealed class KafkaPlugin : IPlugin + { + public void ConfigureServices(IServiceCollection services, IConfiguration config) + { + var kafkaOptions = config.GetSection("kafka").Get(); + + if (kafkaOptions.IsProducerConfigured()) + { + services.AddRuleAction(); + + services.AddSingleton(); + services.AddSingleton(Options.Create(kafkaOptions)); + } + } + } +} diff --git a/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs b/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs new file mode 100644 index 000000000..053c53abb --- /dev/null +++ b/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs @@ -0,0 +1,93 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.Threading.Tasks; +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using Squidex.Infrastructure; +using Squidex.Infrastructure.Log; + +namespace Squidex.Extensions.Actions.Kafka +{ + public sealed class KafkaProducer + { + private readonly IProducer producer; + + public KafkaProducer(IOptions options, ISemanticLog log) + { + producer = new ProducerBuilder(options.Value) + .SetErrorHandler((p, error) => + { + LogError(log, error); + }) + .SetLogHandler((p, message) => + { + LogMessage(log, message); + }) + .SetKeySerializer(Serializers.Utf8) + .SetValueSerializer(Serializers.Utf8) + .Build(); + } + + private static void LogMessage(ISemanticLog log, LogMessage message) + { + var level = SemanticLogLevel.Information; + + switch (message.Level) + { + case SyslogLevel.Emergency: + level = SemanticLogLevel.Error; + break; + case SyslogLevel.Alert: + level = SemanticLogLevel.Error; + break; + case SyslogLevel.Critical: + level = SemanticLogLevel.Error; + break; + case SyslogLevel.Error: + level = SemanticLogLevel.Error; + break; + case SyslogLevel.Warning: + level = SemanticLogLevel.Warning; + break; + case SyslogLevel.Notice: + level = SemanticLogLevel.Information; + break; + case SyslogLevel.Info: + level = SemanticLogLevel.Information; + break; + case SyslogLevel.Debug: + level = SemanticLogLevel.Debug; + break; + } + + log.Log(level, default, (_, w) => w + .WriteProperty("action", "KafkaAction") + .WriteProperty("name", message.Name) + .WriteProperty("message", message.Message)); + } + + private static void LogError(ISemanticLog log, Error error) + { + log.LogWarning(w => w + .WriteProperty("action", "KafkaError") + .WriteProperty("reason", error.Reason)); + } + + public async Task> Send(string topicName, string key, string value) + { + var message = new Message { Key = key, Value = value }; + + return await producer.ProduceAsync(topicName, message); + } + + public void Dispose() + { + producer?.Dispose(); + } + } +} diff --git a/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs b/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs new file mode 100644 index 000000000..ae59143ff --- /dev/null +++ b/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs @@ -0,0 +1,19 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Confluent.Kafka; + +namespace Squidex.Extensions.Actions.Kafka +{ + public class KafkaProducerOptions : ProducerConfig + { + public bool IsProducerConfigured() + { + return !string.IsNullOrWhiteSpace(this.BootstrapServers); + } + } +} diff --git a/extensions/Squidex.Extensions/Squidex.Extensions.csproj b/extensions/Squidex.Extensions/Squidex.Extensions.csproj index 7d3fcddd3..2052f602f 100644 --- a/extensions/Squidex.Extensions/Squidex.Extensions.csproj +++ b/extensions/Squidex.Extensions/Squidex.Extensions.csproj @@ -9,6 +9,7 @@ + diff --git a/src/Squidex/appsettings.json b/src/Squidex/appsettings.json index 86ff93356..2e8254483 100644 --- a/src/Squidex/appsettings.json +++ b/src/Squidex/appsettings.json @@ -97,29 +97,29 @@ /* * The host name to your email server. */ - "server": "", - /* + "server": "", + /* * The sender email address. */ - "sender": "hello@squidex.io", - /* + "sender": "hello@squidex.io", + /* * The username to authenticate to your email server. */ - "username": "", - /* + "username": "", + /* * The password to authenticate to your email server. */ - "password": "", - /* + "password": "", + /* * Always use SSL if possible. */ - "enableSsl": true, - /* + "enableSsl": true, + /* * The port to your email server. */ - "port": 465 - }, + "port": 465 + }, "notifications": { /* * The email subject when a new user is added as contributor. @@ -504,5 +504,12 @@ */ "exposedConfiguration": { "version": "squidex:version" + }, + + /* + *Kafka Producer configuration + */ + "kafka": { + "bootstrapServers": "" } }