diff --git a/Squidex.sln b/Squidex.sln
index 9353d4689..a5953a8b1 100644
--- a/Squidex.sln
+++ b/Squidex.sln
@@ -1,6 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
-VisualStudioVersion = 16.0.29020.237
+VisualStudioVersion = 16.0.29123.88
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex", "src\Squidex\Squidex.csproj", "{61F6BBCE-A080-4400-B194-70E2F5D2096E}"
EndProject
diff --git a/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs b/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs
new file mode 100644
index 000000000..a10cc5a0d
--- /dev/null
+++ b/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs
@@ -0,0 +1,28 @@
+// ==========================================================================
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex UG (haftungsbeschraenkt)
+// All rights reserved. Licensed under the MIT license.
+// ==========================================================================
+
+using System.ComponentModel.DataAnnotations;
+using Squidex.Domain.Apps.Core.HandleRules;
+using Squidex.Domain.Apps.Core.Rules;
+
+namespace Squidex.Extensions.Actions.Kafka
+{
+ [RuleAction(
+ IconImage = "",
+ IconColor = "#404244",
+ Display = "Push to kafka",
+ Description = "Connect to Kafka stream and push data to that stream.",
+ ReadMore = "https://kafka.apache.org/quickstart")]
+ public sealed class KafkaAction : RuleAction
+ {
+ [Required]
+ [Display(Name = "Topic Name", Description = "The name of the topic.")]
+ [DataType(DataType.Text)]
+ [Formattable]
+ public string TopicName { get; set; }
+ }
+}
diff --git a/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs b/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs
new file mode 100644
index 000000000..131917372
--- /dev/null
+++ b/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs
@@ -0,0 +1,62 @@
+// ==========================================================================
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex UG (haftungsbeschraenkt)
+// All rights reserved. Licensed under the MIT license.
+// ==========================================================================
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Squidex.Domain.Apps.Core.HandleRules;
+using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
+
+namespace Squidex.Extensions.Actions.Kafka
+{
+ public sealed class KafkaActionHandler : RuleActionHandler
+ {
+ private const string Description = "Push to Kafka";
+ private readonly KafkaProducer kafkaProducer;
+
+ public KafkaActionHandler(RuleEventFormatter formatter, KafkaProducer kafkaProducer)
+ : base(formatter)
+ {
+ this.kafkaProducer = kafkaProducer;
+ }
+
+ protected override (string Description, KafkaJob Data) CreateJob(EnrichedEvent @event, KafkaAction action)
+ {
+ var ruleJob = new KafkaJob
+ {
+ TopicName = action.TopicName,
+ MessageKey = @event.Name,
+ MessageValue = ToEnvelopeJson(@event)
+ };
+
+ return (Description, ruleJob);
+ }
+
+ protected override async Task ExecuteJobAsync(KafkaJob job, CancellationToken ct)
+ {
+ try
+ {
+ await kafkaProducer.Send(job.TopicName, job.MessageKey, job.MessageValue);
+
+ return Result.Success($"Event pushed to {job.TopicName} kafka topic.");
+ }
+ catch (Exception ex)
+ {
+ return Result.Failed(ex, "Push to Kafka failed.");
+ }
+ }
+ }
+
+ public sealed class KafkaJob
+ {
+ public string TopicName { get; set; }
+
+ public string MessageKey { get; set; }
+
+ public string MessageValue { get; set; }
+ }
+}
diff --git a/extensions/Squidex.Extensions/Actions/Kafka/KafkaPlugin.cs b/extensions/Squidex.Extensions/Actions/Kafka/KafkaPlugin.cs
new file mode 100644
index 000000000..400c0662f
--- /dev/null
+++ b/extensions/Squidex.Extensions/Actions/Kafka/KafkaPlugin.cs
@@ -0,0 +1,30 @@
+// ==========================================================================
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex UG (haftungsbeschraenkt)
+// All rights reserved. Licensed under the MIT license.
+// ==========================================================================
+
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using Squidex.Infrastructure.Plugins;
+
+namespace Squidex.Extensions.Actions.Kafka
+{
+ public sealed class KafkaPlugin : IPlugin
+ {
+ public void ConfigureServices(IServiceCollection services, IConfiguration config)
+ {
+ var kafkaOptions = config.GetSection("kafka").Get();
+
+ if (kafkaOptions.IsProducerConfigured())
+ {
+ services.AddRuleAction();
+
+ services.AddSingleton();
+ services.AddSingleton(Options.Create(kafkaOptions));
+ }
+ }
+ }
+}
diff --git a/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs b/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs
new file mode 100644
index 000000000..053c53abb
--- /dev/null
+++ b/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs
@@ -0,0 +1,93 @@
+// ==========================================================================
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex UG (haftungsbeschraenkt)
+// All rights reserved. Licensed under the MIT license.
+// ==========================================================================
+
+using System.Threading.Tasks;
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using Squidex.Infrastructure;
+using Squidex.Infrastructure.Log;
+
+namespace Squidex.Extensions.Actions.Kafka
+{
+ public sealed class KafkaProducer
+ {
+ private readonly IProducer producer;
+
+ public KafkaProducer(IOptions options, ISemanticLog log)
+ {
+ producer = new ProducerBuilder(options.Value)
+ .SetErrorHandler((p, error) =>
+ {
+ LogError(log, error);
+ })
+ .SetLogHandler((p, message) =>
+ {
+ LogMessage(log, message);
+ })
+ .SetKeySerializer(Serializers.Utf8)
+ .SetValueSerializer(Serializers.Utf8)
+ .Build();
+ }
+
+ private static void LogMessage(ISemanticLog log, LogMessage message)
+ {
+ var level = SemanticLogLevel.Information;
+
+ switch (message.Level)
+ {
+ case SyslogLevel.Emergency:
+ level = SemanticLogLevel.Error;
+ break;
+ case SyslogLevel.Alert:
+ level = SemanticLogLevel.Error;
+ break;
+ case SyslogLevel.Critical:
+ level = SemanticLogLevel.Error;
+ break;
+ case SyslogLevel.Error:
+ level = SemanticLogLevel.Error;
+ break;
+ case SyslogLevel.Warning:
+ level = SemanticLogLevel.Warning;
+ break;
+ case SyslogLevel.Notice:
+ level = SemanticLogLevel.Information;
+ break;
+ case SyslogLevel.Info:
+ level = SemanticLogLevel.Information;
+ break;
+ case SyslogLevel.Debug:
+ level = SemanticLogLevel.Debug;
+ break;
+ }
+
+ log.Log(level, default, (_, w) => w
+ .WriteProperty("action", "KafkaAction")
+ .WriteProperty("name", message.Name)
+ .WriteProperty("message", message.Message));
+ }
+
+ private static void LogError(ISemanticLog log, Error error)
+ {
+ log.LogWarning(w => w
+ .WriteProperty("action", "KafkaError")
+ .WriteProperty("reason", error.Reason));
+ }
+
+ public async Task> Send(string topicName, string key, string value)
+ {
+ var message = new Message { Key = key, Value = value };
+
+ return await producer.ProduceAsync(topicName, message);
+ }
+
+ public void Dispose()
+ {
+ producer?.Dispose();
+ }
+ }
+}
diff --git a/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs b/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs
new file mode 100644
index 000000000..ae59143ff
--- /dev/null
+++ b/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs
@@ -0,0 +1,19 @@
+// ==========================================================================
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex UG (haftungsbeschraenkt)
+// All rights reserved. Licensed under the MIT license.
+// ==========================================================================
+
+using Confluent.Kafka;
+
+namespace Squidex.Extensions.Actions.Kafka
+{
+ public class KafkaProducerOptions : ProducerConfig
+ {
+ public bool IsProducerConfigured()
+ {
+ return !string.IsNullOrWhiteSpace(this.BootstrapServers);
+ }
+ }
+}
diff --git a/extensions/Squidex.Extensions/Squidex.Extensions.csproj b/extensions/Squidex.Extensions/Squidex.Extensions.csproj
index 7d3fcddd3..2052f602f 100644
--- a/extensions/Squidex.Extensions/Squidex.Extensions.csproj
+++ b/extensions/Squidex.Extensions/Squidex.Extensions.csproj
@@ -9,6 +9,7 @@
+
diff --git a/src/Squidex/appsettings.json b/src/Squidex/appsettings.json
index 86ff93356..2e8254483 100644
--- a/src/Squidex/appsettings.json
+++ b/src/Squidex/appsettings.json
@@ -97,29 +97,29 @@
/*
* The host name to your email server.
*/
- "server": "",
- /*
+ "server": "",
+ /*
* The sender email address.
*/
- "sender": "hello@squidex.io",
- /*
+ "sender": "hello@squidex.io",
+ /*
* The username to authenticate to your email server.
*/
- "username": "",
- /*
+ "username": "",
+ /*
* The password to authenticate to your email server.
*/
- "password": "",
- /*
+ "password": "",
+ /*
* Always use SSL if possible.
*/
- "enableSsl": true,
- /*
+ "enableSsl": true,
+ /*
* The port to your email server.
*/
- "port": 465
- },
+ "port": 465
+ },
"notifications": {
/*
* The email subject when a new user is added as contributor.
@@ -504,5 +504,12 @@
*/
"exposedConfiguration": {
"version": "squidex:version"
+ },
+
+ /*
+ *Kafka Producer configuration
+ */
+ "kafka": {
+ "bootstrapServers": ""
}
}