Browse Source

Kafka (#401)

* Added kafka Extension which can push events to Kafka Topics.

* review comments

* kafka tested
pull/402/head
Sebastian Stehle 7 years ago
committed by GitHub
parent
commit
d17c4464b7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      Squidex.sln
  2. 28
      extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs
  3. 62
      extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs
  4. 30
      extensions/Squidex.Extensions/Actions/Kafka/KafkaPlugin.cs
  5. 93
      extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs
  6. 19
      extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs
  7. 1
      extensions/Squidex.Extensions/Squidex.Extensions.csproj
  8. 31
      src/Squidex/appsettings.json

2
Squidex.sln

@ -1,6 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29020.237
VisualStudioVersion = 16.0.29123.88
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex", "src\Squidex\Squidex.csproj", "{61F6BBCE-A080-4400-B194-70E2F5D2096E}"
EndProject

28
extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs

@ -0,0 +1,28 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.ComponentModel.DataAnnotations;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.Rules;
namespace Squidex.Extensions.Actions.Kafka
{
[RuleAction(
IconImage = "<svg version='1.1' xmlns='http://www.w3.org/2000/svg' xmlns:xlink='http://www.w3.org/1999/xlink' x='0px' y='0px' viewBox='0 0 1000 1000' enable-background='new 0 0 1000 1000' xml:space='preserve'><g><path d = 'M674.2,552.7c-38.2,0-72.4,17-95.9,43.6l-60.1-42.5c6.5-17.4,10.1-36.4,10.1-56.1c0-19.5-3.6-38-9.6-55.2l59.9-42c23.5,26.4,57.5,43.4,95.7,43.4c70.4,0,127.7-57.2,127.7-127.7c0-70.4-57.2-127.7-127.7-127.7c-70.4,0-127.7,57.2-127.7,127.7c0,12.5,2,24.8,5.4,36.2l-60.1,42c-25-31.1-61.3-52.8-102.2-59.5v-72.2c57.9-12.3,101.5-63.7,101.5-125C491.1,67.2,433.8,10,363.4,10S235.7,67.2,235.7,137.7c0,60.6,42.5,111.3,99.3,124.5v73.1c-77.8,13.4-136.8,80.9-136.8,162.3c0,81.6,59.7,149.4,137.5,162.5v77.4c-57.2,12.5-100.4,63.7-100.4,124.8c0,70.4,57.2,127.7,127.7,127.7c70.4,0,128.1-57.2,128.1-127.9c0-61-43.2-112.2-100.4-124.8V660c40.2-6.7,75.6-27.9,100.4-58.4l60.4,42.7c-3.4,11.4-5.1,23.5-5.1,36c0,70.4,57.2,127.7,127.7,127.7c70.4,0,127.7-57.2,127.7-127.7C801.6,609.9,744.6,552.7,674.2,552.7L674.2,552.7z M674.2,253.9c34.2,0,61.9,27.7,61.9,61.9c0,34.2-27.7,61.9-61.9,61.9c-34.2,0-62.2-27.7-62.2-61.9C612,281.7,640,253.9,674.2,253.9L674.2,253.9z M301.2,137.7c0-34.2,27.7-61.9,61.9-61.9c34.2,0,61.9,27.7,61.9,61.9s-27.7,61.9-61.9,61.9C329,199.6,301.2,171.7,301.2,137.7L301.2,137.7z M425.1,862.1c0,34.2-27.7,61.9-61.9,61.9c-34.2,0-61.9-27.7-61.9-61.9c0-34.2,27.7-61.9,61.9-61.9C397.4,800.2,425.1,828.1,425.1,862.1L425.1,862.1z M363.2,584c-47.6,0-86.3-38.7-86.3-86.3c0-47.6,38.7-86.3,86.3-86.3c47.6,0,86.3,38.7,86.3,86.3C449.7,545.3,410.8,584,363.2,584L363.2,584z M674.2,742.5c-34.2,0-61.9-27.7-61.9-61.9c0-34.2,27.7-61.9,61.9-61.9c34.2,0,61.9,27.7,61.9,61.9C736.1,714.8,708.2,742.5,674.2,742.5L674.2,742.5z'/></g></svg>",
IconColor = "#404244",
Display = "Push to kafka",
Description = "Connect to Kafka stream and push data to that stream.",
ReadMore = "https://kafka.apache.org/quickstart")]
public sealed class KafkaAction : RuleAction
{
[Required]
[Display(Name = "Topic Name", Description = "The name of the topic.")]
[DataType(DataType.Text)]
[Formattable]
public string TopicName { get; set; }
}
}

62
extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs

@ -0,0 +1,62 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
namespace Squidex.Extensions.Actions.Kafka
{
public sealed class KafkaActionHandler : RuleActionHandler<KafkaAction, KafkaJob>
{
private const string Description = "Push to Kafka";
private readonly KafkaProducer kafkaProducer;
public KafkaActionHandler(RuleEventFormatter formatter, KafkaProducer kafkaProducer)
: base(formatter)
{
this.kafkaProducer = kafkaProducer;
}
protected override (string Description, KafkaJob Data) CreateJob(EnrichedEvent @event, KafkaAction action)
{
var ruleJob = new KafkaJob
{
TopicName = action.TopicName,
MessageKey = @event.Name,
MessageValue = ToEnvelopeJson(@event)
};
return (Description, ruleJob);
}
protected override async Task<Result> ExecuteJobAsync(KafkaJob job, CancellationToken ct)
{
try
{
await kafkaProducer.Send(job.TopicName, job.MessageKey, job.MessageValue);
return Result.Success($"Event pushed to {job.TopicName} kafka topic.");
}
catch (Exception ex)
{
return Result.Failed(ex, "Push to Kafka failed.");
}
}
}
public sealed class KafkaJob
{
public string TopicName { get; set; }
public string MessageKey { get; set; }
public string MessageValue { get; set; }
}
}

30
extensions/Squidex.Extensions/Actions/Kafka/KafkaPlugin.cs

@ -0,0 +1,30 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Squidex.Infrastructure.Plugins;
namespace Squidex.Extensions.Actions.Kafka
{
public sealed class KafkaPlugin : IPlugin
{
public void ConfigureServices(IServiceCollection services, IConfiguration config)
{
var kafkaOptions = config.GetSection("kafka").Get<KafkaProducerOptions>();
if (kafkaOptions.IsProducerConfigured())
{
services.AddRuleAction<KafkaAction, KafkaActionHandler>();
services.AddSingleton<KafkaProducer>();
services.AddSingleton(Options.Create(kafkaOptions));
}
}
}
}

93
extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs

@ -0,0 +1,93 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Log;
namespace Squidex.Extensions.Actions.Kafka
{
public sealed class KafkaProducer
{
private readonly IProducer<string, string> producer;
public KafkaProducer(IOptions<KafkaProducerOptions> options, ISemanticLog log)
{
producer = new ProducerBuilder<string, string>(options.Value)
.SetErrorHandler((p, error) =>
{
LogError(log, error);
})
.SetLogHandler((p, message) =>
{
LogMessage(log, message);
})
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.Utf8)
.Build();
}
private static void LogMessage(ISemanticLog log, LogMessage message)
{
var level = SemanticLogLevel.Information;
switch (message.Level)
{
case SyslogLevel.Emergency:
level = SemanticLogLevel.Error;
break;
case SyslogLevel.Alert:
level = SemanticLogLevel.Error;
break;
case SyslogLevel.Critical:
level = SemanticLogLevel.Error;
break;
case SyslogLevel.Error:
level = SemanticLogLevel.Error;
break;
case SyslogLevel.Warning:
level = SemanticLogLevel.Warning;
break;
case SyslogLevel.Notice:
level = SemanticLogLevel.Information;
break;
case SyslogLevel.Info:
level = SemanticLogLevel.Information;
break;
case SyslogLevel.Debug:
level = SemanticLogLevel.Debug;
break;
}
log.Log<None>(level, default, (_, w) => w
.WriteProperty("action", "KafkaAction")
.WriteProperty("name", message.Name)
.WriteProperty("message", message.Message));
}
private static void LogError(ISemanticLog log, Error error)
{
log.LogWarning(w => w
.WriteProperty("action", "KafkaError")
.WriteProperty("reason", error.Reason));
}
public async Task<DeliveryResult<string, string>> Send(string topicName, string key, string value)
{
var message = new Message<string, string> { Key = key, Value = value };
return await producer.ProduceAsync(topicName, message);
}
public void Dispose()
{
producer?.Dispose();
}
}
}

19
extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs

@ -0,0 +1,19 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Confluent.Kafka;
namespace Squidex.Extensions.Actions.Kafka
{
public class KafkaProducerOptions : ProducerConfig
{
public bool IsProducerConfigured()
{
return !string.IsNullOrWhiteSpace(this.BootstrapServers);
}
}
}

1
extensions/Squidex.Extensions/Squidex.Extensions.csproj

@ -9,6 +9,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Algolia.Search" Version="5.3.1" />
<PackageReference Include="Confluent.Kafka" Version="1.0.0" />
<PackageReference Include="CoreTweet" Version="1.0.0.483" />
<PackageReference Include="Elasticsearch.Net" Version="6.8.1" />
<PackageReference Include="Microsoft.Extensions.Http" Version="2.2.0" />

31
src/Squidex/appsettings.json

@ -97,29 +97,29 @@
/*
* The host name to your email server.
*/
"server": "",
/*
"server": "",
/*
* The sender email address.
*/
"sender": "hello@squidex.io",
/*
"sender": "hello@squidex.io",
/*
* The username to authenticate to your email server.
*/
"username": "",
/*
"username": "",
/*
* The password to authenticate to your email server.
*/
"password": "",
/*
"password": "",
/*
* Always use SSL if possible.
*/
"enableSsl": true,
/*
"enableSsl": true,
/*
* The port to your email server.
*/
"port": 465
},
"port": 465
},
"notifications": {
/*
* The email subject when a new user is added as contributor.
@ -504,5 +504,12 @@
*/
"exposedConfiguration": {
"version": "squidex:version"
},
/*
*Kafka Producer configuration
*/
"kafka": {
"bootstrapServers": ""
}
}

Loading…
Cancel
Save