Browse Source

Merge pull request #5034 from abpframework/liangshiwei/Eventbus-kafka

Add Kafka event bus integration
pull/5232/head
Halil İbrahim Kalkan 6 years ago
committed by GitHub
parent
commit
120804d14c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      framework/Volo.Abp.sln
  2. 3
      framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xml
  3. 30
      framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xsd
  4. 16
      framework/src/Volo.Abp.EventBus.Kafka/Volo.Abp.EventBus.Kafka.csproj
  5. 27
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpEventBusKafkaModule.cs
  6. 12
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpKafkaEventBusOptions.cs
  7. 208
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
  8. 3
      framework/src/Volo.Abp.Kafka/FodyWeavers.xml
  9. 30
      framework/src/Volo.Abp.Kafka/FodyWeavers.xsd
  10. 17
      framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj
  11. 31
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaModule.cs
  12. 22
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs
  13. 109
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs
  14. 10
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs
  15. 11
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs
  16. 19
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs
  17. 11
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs
  18. 10
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs
  19. 35
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs
  20. 156
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs
  21. 32
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs
  22. 102
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs
  23. 27
      framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs
  24. 2
      nupkg/common.ps1

14
framework/Volo.Abp.sln

@ -323,6 +323,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BlobStoring.Aws",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BlobStoring.Aws.Tests", "test\Volo.Abp.BlobStoring.Aws.Tests\Volo.Abp.BlobStoring.Aws.Tests.csproj", "{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Kafka", "src\Volo.Abp.Kafka\Volo.Abp.Kafka.csproj", "{2A864049-9CD5-4493-8CDB-C408474D43D4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Kafka", "src\Volo.Abp.EventBus.Kafka\Volo.Abp.EventBus.Kafka.csproj", "{C1D891B0-AE83-42CB-987D-425A2787DE78}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.GlobalFeatures", "src\Volo.Abp.GlobalFeatures\Volo.Abp.GlobalFeatures.csproj", "{04F44063-C952-403A-815F-EFB778BDA125}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.GlobalFeatures.Tests", "test\Volo.Abp.GlobalFeatures.Tests\Volo.Abp.GlobalFeatures.Tests.csproj", "{231F1581-AA21-44C3-BF27-51EB3AD5355C}"
@ -965,6 +969,14 @@ Global
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Release|Any CPU.Build.0 = Release|Any CPU
{2A864049-9CD5-4493-8CDB-C408474D43D4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2A864049-9CD5-4493-8CDB-C408474D43D4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2A864049-9CD5-4493-8CDB-C408474D43D4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2A864049-9CD5-4493-8CDB-C408474D43D4}.Release|Any CPU.Build.0 = Release|Any CPU
{C1D891B0-AE83-42CB-987D-425A2787DE78}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C1D891B0-AE83-42CB-987D-425A2787DE78}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C1D891B0-AE83-42CB-987D-425A2787DE78}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C1D891B0-AE83-42CB-987D-425A2787DE78}.Release|Any CPU.Build.0 = Release|Any CPU
{04F44063-C952-403A-815F-EFB778BDA125}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{04F44063-C952-403A-815F-EFB778BDA125}.Debug|Any CPU.Build.0 = Debug|Any CPU
{04F44063-C952-403A-815F-EFB778BDA125}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -1136,6 +1148,8 @@ Global
{8E49687A-E69F-49F2-8DB0-428D0883A937} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{50968CDE-1029-4051-B2E5-B69D0ECF2A18} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{2A864049-9CD5-4493-8CDB-C408474D43D4} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{C1D891B0-AE83-42CB-987D-425A2787DE78} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{04F44063-C952-403A-815F-EFB778BDA125} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{231F1581-AA21-44C3-BF27-51EB3AD5355C} = {447C8A77-E5F0-4538-8687-7383196D04EA}
EndGlobalSection

3
framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xml

@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>

30
framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xsd

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>

16
framework/src/Volo.Abp.EventBus.Kafka/Volo.Abp.EventBus.Kafka.csproj

@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<RootNamespace />
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
<ProjectReference Include="..\Volo.Abp.Kafka\Volo.Abp.Kafka.csproj" />
</ItemGroup>
</Project>

27
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpEventBusKafkaModule.cs

@ -0,0 +1,27 @@
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Kafka;
using Volo.Abp.Modularity;
namespace Volo.Abp.EventBus.Kafka
{
[DependsOn(
typeof(AbpEventBusModule),
typeof(AbpKafkaModule))]
public class AbpEventBusKafkaModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
Configure<AbpKafkaEventBusOptions>(configuration.GetSection("Kafka:EventBus"));
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context
.ServiceProvider
.GetRequiredService<KafkaDistributedEventBus>()
.Initialize();
}
}
}

12
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpKafkaEventBusOptions.cs

@ -0,0 +1,12 @@
namespace Volo.Abp.EventBus.Kafka
{
public class AbpKafkaEventBusOptions
{
public string ConnectionName { get; set; }
public string TopicName { get; set; }
public string GroupId { get; set; }
}
}

208
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs

@ -0,0 +1,208 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Kafka;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
namespace Volo.Abp.EventBus.Kafka
{
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))]
public class KafkaDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
{
protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; }
protected IKafkaSerializer Serializer { get; }
protected IProducerPool ProducerPool { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected IKafkaMessageConsumer Consumer { get; private set; }
public KafkaDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IOptions<AbpKafkaEventBusOptions> abpKafkaEventBusOptions,
IKafkaMessageConsumerFactory messageConsumerFactory,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IKafkaSerializer serializer,
IProducerPool producerPool)
: base(serviceScopeFactory, currentTenant)
{
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;
MessageConsumerFactory = messageConsumerFactory;
Serializer = serializer;
ProducerPool = producerPool;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
}
public void Initialize()
{
Consumer = MessageConsumerFactory.Create(
AbpKafkaEventBusOptions.TopicName,
AbpKafkaEventBusOptions.GroupId,
AbpKafkaEventBusOptions.ConnectionName);
Consumer.OnMessageReceived(ProcessEventAsync);
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}
private async Task ProcessEventAsync(Message<string, byte[]> message)
{
var eventName = message.Key;
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return;
}
var eventData = Serializer.Deserialize(message.Value, eventType);
await TriggerHandlersAsync(eventType, eventData);
}
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
/// <inheritdoc/>
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
Check.NotNull(action, nameof(action));
GetOrCreateHandlerFactories(typeof(TEvent))
.Locking(factories =>
{
factories.RemoveAll(
factory =>
{
var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
if (singleInstanceFactory == null)
{
return false;
}
var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEvent>;
if (actionHandler == null)
{
return false;
}
return actionHandler.Action == action;
});
});
}
/// <inheritdoc/>
public override void Unsubscribe(Type eventType, IEventHandler handler)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory handlerFactory &&
handlerFactory.HandlerInstance == handler
);
});
}
/// <inheritdoc/>
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
}
/// <inheritdoc/>
public override void UnsubscribeAll(Type eventType)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}
public override async Task PublishAsync(Type eventType, object eventData)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
await producer.ProduceAsync(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Key = eventName, Value = body
});
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(
eventType,
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
EventTypes[eventName] = type;
return new List<IEventHandlerFactory>();
}
);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))
)
{
handlerFactoryList.Add(
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
return handlerFactoryList.ToArray();
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type
if (handlerEventType == targetEventType)
{
return true;
}
//Should trigger for inherited types
if (handlerEventType.IsAssignableFrom(targetEventType))
{
return true;
}
return false;
}
}
}

3
framework/src/Volo.Abp.Kafka/FodyWeavers.xml

@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>

30
framework/src/Volo.Abp.Kafka/FodyWeavers.xsd

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>

17
framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj

@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<RootNamespace />
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.5.0" />
<ProjectReference Include="..\Volo.Abp.Json\Volo.Abp.Json.csproj" />
<ProjectReference Include="..\Volo.Abp.Threading\Volo.Abp.Threading.csproj" />
</ItemGroup>
</Project>

31
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaModule.cs

@ -0,0 +1,31 @@
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Json;
using Volo.Abp.Modularity;
using Volo.Abp.Threading;
namespace Volo.Abp.Kafka
{
[DependsOn(
typeof(AbpJsonModule),
typeof(AbpThreadingModule)
)]
public class AbpKafkaModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
Configure<AbpKafkaOptions>(configuration.GetSection("Kafka"));
}
public override void OnApplicationShutdown(ApplicationShutdownContext context)
{
context.ServiceProvider
.GetRequiredService<IConsumerPool>()
.Dispose();
context.ServiceProvider
.GetRequiredService<IProducerPool>()
.Dispose();
}
}
}

22
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs

@ -0,0 +1,22 @@
using System;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
namespace Volo.Abp.Kafka
{
public class AbpKafkaOptions
{
public KafkaConnections Connections { get; }
public Action<ProducerConfig> ConfigureProducer { get; set; }
public Action<ConsumerConfig> ConfigureConsumer { get; set; }
public Action<TopicSpecification> ConfigureTopic { get; set; }
public AbpKafkaOptions()
{
Connections = new KafkaConnections();
}
}
}

109
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs

@ -0,0 +1,109 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.Kafka
{
public class ConsumerPool : IConsumerPool, ISingletonDependency
{
protected AbpKafkaOptions Options { get; }
protected ConcurrentDictionary<string, IConsumer<string, byte[]>> Consumers { get; }
protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10);
public ILogger<ConsumerPool> Logger { get; set; }
private bool _isDisposed;
public ConsumerPool(IOptions<AbpKafkaOptions> options)
{
Options = options.Value;
Consumers = new ConcurrentDictionary<string, IConsumer<string, byte[]>>();
Logger = new NullLogger<ConsumerPool>();
}
public virtual IConsumer<string, byte[]> Get(string groupId, string connectionName = null)
{
connectionName ??= KafkaConnections.DefaultConnectionName;
return Consumers.GetOrAdd(
connectionName, connection =>
{
var config = new ConsumerConfig(Options.Connections.GetOrDefault(connection))
{
GroupId = groupId,
EnableAutoCommit = false
};
Options.ConfigureConsumer?.Invoke(config);
return new ConsumerBuilder<string, byte[]>(config).Build();
}
);
}
public void Dispose()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
if (!Consumers.Any())
{
Logger.LogDebug($"Disposed consumer pool with no consumers in the pool.");
return;
}
var poolDisposeStopwatch = Stopwatch.StartNew();
Logger.LogInformation($"Disposing consumer pool ({Consumers.Count} consumers).");
var remainingWaitDuration = TotalDisposeWaitDuration;
foreach (var consumer in Consumers.Values)
{
var poolItemDisposeStopwatch = Stopwatch.StartNew();
try
{
consumer.Close();
consumer.Dispose();
}
catch
{
}
poolItemDisposeStopwatch.Stop();
remainingWaitDuration = remainingWaitDuration > poolItemDisposeStopwatch.Elapsed
? remainingWaitDuration.Subtract(poolItemDisposeStopwatch.Elapsed)
: TimeSpan.Zero;
}
poolDisposeStopwatch.Stop();
Logger.LogInformation(
$"Disposed Kafka Consumer Pool ({Consumers.Count} consumers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms).");
if (poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0)
{
Logger.LogWarning(
$"Disposing Kafka Consumer Pool got time greather than expected: {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms.");
}
Consumers.Clear();
}
}
}

10
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs

@ -0,0 +1,10 @@
using System;
using Confluent.Kafka;
namespace Volo.Abp.Kafka
{
public interface IConsumerPool : IDisposable
{
IConsumer<string, byte[]> Get(string groupId, string connectionName = null);
}
}

11
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs

@ -0,0 +1,11 @@
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace Volo.Abp.Kafka
{
public interface IKafkaMessageConsumer
{
void OnMessageReceived(Func<Message<string, byte[]>, Task> callback);
}
}

19
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs

@ -0,0 +1,19 @@
namespace Volo.Abp.Kafka
{
public interface IKafkaMessageConsumerFactory
{
/// <summary>
/// Creates a new <see cref="IKafkaMessageConsumer"/>.
/// Avoid to create too many consumers since they are
/// not disposed until end of the application.
/// </summary>
/// <param name="topicName"></param>
/// <param name="groupId"></param>
/// <param name="connectionName"></param>
/// <returns></returns>
IKafkaMessageConsumer Create(
string topicName,
string groupId,
string connectionName = null);
}
}

11
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs

@ -0,0 +1,11 @@
using System;
namespace Volo.Abp.Kafka
{
public interface IKafkaSerializer
{
byte[] Serialize(object obj);
object Deserialize(byte[] value, Type type);
}
}

10
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs

@ -0,0 +1,10 @@
using System;
using Confluent.Kafka;
namespace Volo.Abp.Kafka
{
public interface IProducerPool : IDisposable
{
IProducer<string, byte[]> Get(string connectionName = null);
}
}

35
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs

@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using Confluent.Kafka;
using JetBrains.Annotations;
namespace Volo.Abp.Kafka
{
[Serializable]
public class KafkaConnections : Dictionary<string, ClientConfig>
{
public const string DefaultConnectionName = "Default";
[NotNull]
public ClientConfig Default
{
get => this[DefaultConnectionName];
set => this[DefaultConnectionName] = Check.NotNull(value, nameof(value));
}
public KafkaConnections()
{
Default = new ClientConfig();
}
public ClientConfig GetOrDefault(string connectionName)
{
if (TryGetValue(connectionName, out var connectionFactory))
{
return connectionFactory;
}
return Default;
}
}
}

156
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs

@ -0,0 +1,156 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.ExceptionHandling;
using Volo.Abp.Threading;
namespace Volo.Abp.Kafka
{
public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency, IDisposable
{
public ILogger<KafkaMessageConsumer> Logger { get; set; }
protected IConsumerPool ConsumerPool { get; }
protected IExceptionNotifier ExceptionNotifier { get; }
protected AbpKafkaOptions Options { get; }
protected ConcurrentBag<Func<Message<string, byte[]>, Task>> Callbacks { get; }
protected IConsumer<string, byte[]> Consumer { get; private set; }
protected string ConnectionName { get; private set; }
protected string GroupId { get; private set; }
protected string TopicName { get; private set; }
public KafkaMessageConsumer(
IConsumerPool consumerPool,
IExceptionNotifier exceptionNotifier,
IOptions<AbpKafkaOptions> options)
{
ConsumerPool = consumerPool;
ExceptionNotifier = exceptionNotifier;
Options = options.Value;
Logger = NullLogger<KafkaMessageConsumer>.Instance;
Callbacks = new ConcurrentBag<Func<Message<string, byte[]>, Task>>();
}
public virtual void Initialize(
[NotNull] string topicName,
[NotNull] string groupId,
string connectionName = null)
{
Check.NotNull(topicName, nameof(topicName));
Check.NotNull(groupId, nameof(groupId));
TopicName = topicName;
ConnectionName = connectionName ?? KafkaConnections.DefaultConnectionName;
GroupId = groupId;
AsyncHelper.RunSync(CreateTopicAsync);
Consume();
}
public virtual void OnMessageReceived(Func<Message<string, byte[]>, Task> callback)
{
Callbacks.Add(callback);
}
protected virtual async Task CreateTopicAsync()
{
using (var adminClient = new AdminClientBuilder(Options.Connections.GetOrDefault(ConnectionName)).Build())
{
var topic = new TopicSpecification
{
Name = TopicName,
NumPartitions = 1,
ReplicationFactor = 1
};
Options.ConfigureTopic?.Invoke(topic);
try
{
await adminClient.CreateTopicsAsync(new[] {topic});
}
catch (CreateTopicsException e)
{
if (!e.Error.Reason.Contains($"Topic '{TopicName}' already exists"))
{
throw;
}
}
}
}
protected virtual void Consume()
{
Consumer = ConsumerPool.Get(GroupId, ConnectionName);
Task.Factory.StartNew(async () =>
{
Consumer.Subscribe(TopicName);
while (true)
{
try
{
var consumeResult = Consumer.Consume();
if (consumeResult.IsPartitionEOF)
{
continue;
}
await HandleIncomingMessage(consumeResult);
}
catch (ConsumeException ex)
{
Logger.LogException(ex, LogLevel.Warning);
AsyncHelper.RunSync(() => ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning));
}
}
});
}
protected virtual async Task HandleIncomingMessage(ConsumeResult<string, byte[]> consumeResult)
{
try
{
foreach (var callback in Callbacks)
{
await callback(consumeResult.Message);
}
Consumer.Commit(consumeResult);
}
catch (Exception ex)
{
Logger.LogException(ex);
await ExceptionNotifier.NotifyAsync(ex);
}
}
public virtual void Dispose()
{
if (Consumer == null)
{
return;
}
Consumer.Close();
Consumer.Dispose();
}
}
}

32
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs

@ -0,0 +1,32 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.Kafka
{
public class KafkaMessageConsumerFactory : IKafkaMessageConsumerFactory, ISingletonDependency, IDisposable
{
protected IServiceScope ServiceScope { get; }
public KafkaMessageConsumerFactory(IServiceScopeFactory serviceScopeFactory)
{
ServiceScope = serviceScopeFactory.CreateScope();
}
public IKafkaMessageConsumer Create(
string topicName,
string groupId,
string connectionName = null)
{
var consumer = ServiceScope.ServiceProvider.GetRequiredService<KafkaMessageConsumer>();
consumer.Initialize(topicName, groupId, connectionName);
return consumer;
}
public void Dispose()
{
ServiceScope?.Dispose();
}
}
}

102
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs

@ -0,0 +1,102 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.Kafka
{
public class ProducerPool : IProducerPool, ISingletonDependency
{
protected AbpKafkaOptions Options { get; }
protected ConcurrentDictionary<string, IProducer<string, byte[]>> Producers { get; }
protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10);
public ILogger<ProducerPool> Logger { get; set; }
private bool _isDisposed;
public ProducerPool(IOptions<AbpKafkaOptions> options)
{
Options = options.Value;
Producers = new ConcurrentDictionary<string, IProducer<string, byte[]>>();
Logger = new NullLogger<ProducerPool>();
}
public virtual IProducer<string, byte[]> Get(string connectionName = null)
{
connectionName ??= KafkaConnections.DefaultConnectionName;
return Producers.GetOrAdd(
connectionName, connection =>
{
var config = Options.Connections.GetOrDefault(connection);
Options.ConfigureProducer?.Invoke(new ProducerConfig(config));
return new ProducerBuilder<string, byte[]>(config).Build();
});
}
public void Dispose()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
if (!Producers.Any())
{
Logger.LogDebug($"Disposed producer pool with no producers in the pool.");
return;
}
var poolDisposeStopwatch = Stopwatch.StartNew();
Logger.LogInformation($"Disposing producer pool ({Producers.Count} producers).");
var remainingWaitDuration = TotalDisposeWaitDuration;
foreach (var producer in Producers.Values)
{
var poolItemDisposeStopwatch = Stopwatch.StartNew();
try
{
producer.Dispose();
}
catch
{
}
poolItemDisposeStopwatch.Stop();
remainingWaitDuration = remainingWaitDuration > poolItemDisposeStopwatch.Elapsed
? remainingWaitDuration.Subtract(poolItemDisposeStopwatch.Elapsed)
: TimeSpan.Zero;
}
poolDisposeStopwatch.Stop();
Logger.LogInformation(
$"Disposed Kafka Producer Pool ({Producers.Count} producers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms).");
if (poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0)
{
Logger.LogWarning(
$"Disposing Kafka Producer Pool got time greather than expected: {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms.");
}
Producers.Clear();
}
}
}

27
framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs

@ -0,0 +1,27 @@
using System;
using System.Text;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Json;
namespace Volo.Abp.Kafka
{
public class Utf8JsonKafkaSerializer : IKafkaSerializer, ITransientDependency
{
private readonly IJsonSerializer _jsonSerializer;
public Utf8JsonKafkaSerializer(IJsonSerializer jsonSerializer)
{
_jsonSerializer = jsonSerializer;
}
public byte[] Serialize(object obj)
{
return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj));
}
public object Deserialize(byte[] value, Type type)
{
return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value));
}
}
}

2
nupkg/common.ps1

@ -84,6 +84,7 @@ $projects = (
"framework/src/Volo.Abp.EntityFrameworkCore.SqlServer",
"framework/src/Volo.Abp.EventBus",
"framework/src/Volo.Abp.EventBus.RabbitMQ",
"framework/src/Volo.Abp.EventBus.Kafka",
"framework/src/Volo.Abp.Features",
"framework/src/Volo.Abp.FluentValidation",
"framework/src/Volo.Abp.GlobalFeatures",
@ -123,6 +124,7 @@ $projects = (
"framework/src/Volo.Abp.Validation.Abstractions",
"framework/src/Volo.Abp.Validation",
"framework/src/Volo.Abp.VirtualFileSystem",
"framework/src/Volo.Abp.Kafka"
# modules/account
"modules/account/src/Volo.Abp.Account.Application.Contracts",

Loading…
Cancel
Save