mirror of https://github.com/abpframework/abp.git
37 changed files with 1146 additions and 8 deletions
@ -0,0 +1,133 @@ |
|||
# Distributed Event Bus Azure Integration |
|||
|
|||
> This document explains **how to configure the [Azure Service Bus](https://azure.microsoft.com/en-us/services/service-bus/)** as the distributed event bus provider. See the [distributed event bus document](Distributed-Event-Bus.md) to learn how to use the distributed event bus system |
|||
|
|||
## Installation |
|||
|
|||
Use the ABP CLI to add [Volo.Abp.EventBus.Azure](https://www.nuget.org/packages/Volo.Abp.EventBus.Azure) NuGet package to your project: |
|||
|
|||
* Install the [ABP CLI](https://docs.abp.io/en/abp/latest/CLI) if you haven't installed before. |
|||
* Open a command line (terminal) in the directory of the `.csproj` file you want to add the `Volo.Abp.EventBus.Azure` package. |
|||
* Run `abp add-package Volo.Abp.EventBus.Azure` command. |
|||
|
|||
If you want to do it manually, install the [Volo.Abp.EventBus.Azure](https://www.nuget.org/packages/Volo.Abp.EventBus.Azure) NuGet package to your project and add `[DependsOn(typeof(AbpEventBusAzureModule))]` to the [ABP module](Module-Development-Basics.md) class inside your project. |
|||
|
|||
## Configuration |
|||
|
|||
You can configure using the standard [configuration system](Configuration.md), like using the `appsettings.json` file, or using the [options](Options.md) classes. |
|||
|
|||
### `appsettings.json` file configuration |
|||
|
|||
This is the simplest way to configure the Azure Service Bus settings. It is also very strong since you can use any other configuration source (like environment variables) that is [supported by the AspNet Core](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/configuration/). |
|||
|
|||
**Example: The minimal configuration to connect to Azure Service Bus Namespace with default configurations** |
|||
|
|||
````json |
|||
{ |
|||
"Azure": { |
|||
"ServiceBus": { |
|||
"Connections": { |
|||
"Default": { |
|||
"ConnectionString": "Endpoint=sb://sb-my-app.servicebus.windows.net/;SharedAccessKeyName={{Policy Name}};SharedAccessKey={};EntityPath=marketing-consent" |
|||
} |
|||
} |
|||
}, |
|||
"EventBus": { |
|||
"ConnectionName": "Default", |
|||
"SubscriberName": "MySubscriberName", |
|||
"TopicName": "MyTopicName" |
|||
} |
|||
} |
|||
} |
|||
```` |
|||
|
|||
* `MySubscriberName` is the name of this subscription, which is used as the **Subscriber** on the Azure Service Bus. |
|||
* `MyTopicName` is the **topic name**. |
|||
|
|||
See [the Azure Service Bus document](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions) to understand these options better. |
|||
|
|||
#### Connections |
|||
|
|||
If you need to connect to another Azure Service Bus Namespace the Default, you need to configure the connection properties. |
|||
|
|||
**Example: Declare two connections and use one of them for the event bus** |
|||
|
|||
````json |
|||
{ |
|||
"Azure": { |
|||
"ServiceBus": { |
|||
"Connections": { |
|||
"Default": { |
|||
"ConnectionString": "Endpoint=sb://sb-my-app.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={{SharedAccessKey}}" |
|||
}, |
|||
"SecondConnection": { |
|||
"ConnectionString": "Endpoint=sb://sb-my-app.servicebus.windows.net/;SharedAccessKeyName={{Policy Name}};SharedAccessKey={{SharedAccessKey}}" |
|||
} |
|||
} |
|||
}, |
|||
"EventBus": { |
|||
"ConnectionName": "SecondConnection", |
|||
"SubscriberName": "MySubscriberName", |
|||
"TopicName": "MyTopicName" |
|||
} |
|||
} |
|||
} |
|||
```` |
|||
|
|||
This allows you to use multiple Azure Service Bus namespaces in your application, but select one of them for the event bus. |
|||
|
|||
You can use any of the [ServiceBusAdministrationClientOptions](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.administration.servicebusadministrationclientoptions?view=azure-dotnet), [ServiceBusClientOptions](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusclientoptions?view=azure-dotnet), [ServiceBusProcessorOptions](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusprocessoroptions?view=azure-dotnet) properties for the connection. |
|||
|
|||
**Example: Specify the Admin, Client and Processor options** |
|||
|
|||
````json |
|||
{ |
|||
"Azure": { |
|||
"ServiceBus": { |
|||
"Connections": { |
|||
"Default": { |
|||
"ConnectionString": "Endpoint=sb://sb-my-app.servicebus.windows.net/;SharedAccessKeyName={{Policy Name}};SharedAccessKey={};EntityPath=marketing-consent", |
|||
"Admin": { |
|||
"Retry": { |
|||
"MaxRetries": 3 |
|||
} |
|||
}, |
|||
"Client": { |
|||
"RetryOptions": { |
|||
"MaxRetries": 1 |
|||
} |
|||
}, |
|||
"Processor": { |
|||
"AutoCompleteMessages": true, |
|||
"ReceiveMode": "ReceiveAndDelete" |
|||
} |
|||
} |
|||
} |
|||
}, |
|||
"EventBus": { |
|||
"ConnectionName": "Default", |
|||
"SubscriberName": "MySubscriberName", |
|||
"TopicName": "MyTopicName" |
|||
} |
|||
} |
|||
} |
|||
```` |
|||
|
|||
### The Options Classes |
|||
|
|||
`AbpAzureServiceBusOptions` and `AbpAzureEventBusOptions` classes can be used to configure the connection strings and event bus options for Azure Service Bus. |
|||
|
|||
You can configure this options inside the `ConfigureServices` of your [module](Module-Development-Basics.md). |
|||
|
|||
**Example: Configure the connection** |
|||
|
|||
````csharp |
|||
Configure<AbpAzureServiceBusOptions>(options => |
|||
{ |
|||
options.Connections.Default.ConnectionString = "Endpoint=sb://sb-my-app.servicebus.windows.net/;SharedAccessKeyName={{Policy Name}};SharedAccessKey={}"; |
|||
options.Connections.Default.Admin.Retry.MaxRetries = 3; |
|||
options.Connections.Default.Client.RetryOptions.MaxRetries = 1; |
|||
}); |
|||
```` |
|||
|
|||
Using these options classes can be combined with the `appsettings.json` way. Configuring an option property in the code overrides the value in the configuration file. |
|||
@ -0,0 +1,3 @@ |
|||
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd"> |
|||
<ConfigureAwait ContinueOnCapturedContext="false" /> |
|||
</Weavers> |
|||
@ -0,0 +1,30 @@ |
|||
<?xml version="1.0" encoding="utf-8"?> |
|||
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> |
|||
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. --> |
|||
<xs:element name="Weavers"> |
|||
<xs:complexType> |
|||
<xs:all> |
|||
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1"> |
|||
<xs:complexType> |
|||
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" /> |
|||
</xs:complexType> |
|||
</xs:element> |
|||
</xs:all> |
|||
<xs:attribute name="VerifyAssembly" type="xs:boolean"> |
|||
<xs:annotation> |
|||
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
<xs:attribute name="VerifyIgnoreCodes" type="xs:string"> |
|||
<xs:annotation> |
|||
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
<xs:attribute name="GenerateXsd" type="xs:boolean"> |
|||
<xs:annotation> |
|||
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
</xs:complexType> |
|||
</xs:element> |
|||
</xs:schema> |
|||
@ -0,0 +1,23 @@ |
|||
<Project Sdk="Microsoft.NET.Sdk"> |
|||
|
|||
<Import Project="..\..\..\configureawait.props" /> |
|||
<Import Project="..\..\..\common.props" /> |
|||
|
|||
<PropertyGroup> |
|||
<TargetFramework>netstandard2.0</TargetFramework> |
|||
<AssemblyName>Volo.Abp.AzureServiceBus</AssemblyName> |
|||
<PackageId>Volo.Abp.AzureServiceBus</PackageId> |
|||
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback> |
|||
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute> |
|||
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute> |
|||
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute> |
|||
<RootNamespace /> |
|||
</PropertyGroup> |
|||
|
|||
<ItemGroup> |
|||
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.1.1" /> |
|||
<ProjectReference Include="..\Volo.Abp.Json\Volo.Abp.Json.csproj" /> |
|||
<ProjectReference Include="..\Volo.Abp.Threading\Volo.Abp.Threading.csproj" /> |
|||
</ItemGroup> |
|||
|
|||
</Project> |
|||
@ -0,0 +1,20 @@ |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Volo.Abp.Json; |
|||
using Volo.Abp.Modularity; |
|||
using Volo.Abp.Threading; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
[DependsOn( |
|||
typeof(AbpJsonModule), |
|||
typeof(AbpThreadingModule) |
|||
)] |
|||
public class AbpAzureServiceBusModule : AbpModule |
|||
{ |
|||
public override void ConfigureServices(ServiceConfigurationContext context) |
|||
{ |
|||
var configuration = context.Services.GetConfiguration(); |
|||
Configure<AbpAzureServiceBusOptions>(configuration.GetSection("Azure:ServiceBus")); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,12 @@ |
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public class AbpAzureServiceBusOptions |
|||
{ |
|||
public AzureServiceBusConnections Connections { get; } |
|||
|
|||
public AbpAzureServiceBusOptions() |
|||
{ |
|||
Connections = new AzureServiceBusConnections(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using JetBrains.Annotations; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
[Serializable] |
|||
public class AzureServiceBusConnections : Dictionary<string, ClientConfig> |
|||
{ |
|||
public const string DefaultConnectionName = "Default"; |
|||
|
|||
[NotNull] |
|||
public ClientConfig Default |
|||
{ |
|||
get => this[DefaultConnectionName]; |
|||
set => this[DefaultConnectionName] = Check.NotNull(value, nameof(value)); |
|||
} |
|||
|
|||
public AzureServiceBusConnections() |
|||
{ |
|||
Default = new ClientConfig(); |
|||
} |
|||
|
|||
public ClientConfig GetOrDefault(string connectionName) |
|||
{ |
|||
return TryGetValue(connectionName, out var connectionFactory) |
|||
? connectionFactory |
|||
: Default; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,104 @@ |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Azure.Messaging.ServiceBus; |
|||
using JetBrains.Annotations; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.ExceptionHandling; |
|||
using Volo.Abp.Threading; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public class AzureServiceBusMessageConsumer : IAzureServiceBusMessageConsumer, ITransientDependency |
|||
{ |
|||
public ILogger<AzureServiceBusMessageConsumer> Logger { get; set; } |
|||
|
|||
private readonly IExceptionNotifier _exceptionNotifier; |
|||
private readonly IProcessorPool _processorPool; |
|||
private readonly ConcurrentBag<Func<ServiceBusReceivedMessage, Task>> _callbacks; |
|||
private string _connectionName; |
|||
private string _subscriptionName; |
|||
private string _topicName; |
|||
|
|||
public AzureServiceBusMessageConsumer( |
|||
IExceptionNotifier exceptionNotifier, |
|||
IProcessorPool processorPool) |
|||
{ |
|||
_exceptionNotifier = exceptionNotifier; |
|||
_processorPool = processorPool; |
|||
Logger = NullLogger<AzureServiceBusMessageConsumer>.Instance; |
|||
_callbacks = new ConcurrentBag<Func<ServiceBusReceivedMessage, Task>>(); |
|||
} |
|||
|
|||
public virtual void Initialize( |
|||
[NotNull] string topicName, |
|||
[NotNull] string subscriptionName, |
|||
string connectionName) |
|||
{ |
|||
Check.NotNull(topicName, nameof(topicName)); |
|||
Check.NotNull(subscriptionName, nameof(subscriptionName)); |
|||
|
|||
_topicName = topicName; |
|||
_connectionName = connectionName ?? AzureServiceBusConnections.DefaultConnectionName; |
|||
_subscriptionName = subscriptionName; |
|||
StartProcessing(); |
|||
} |
|||
|
|||
public void OnMessageReceived(Func<ServiceBusReceivedMessage, Task> callback) |
|||
{ |
|||
_callbacks.Add(callback); |
|||
} |
|||
|
|||
protected virtual void StartProcessing() |
|||
{ |
|||
Task.Factory.StartNew(function: async () => |
|||
{ |
|||
var serviceBusProcessor = await _processorPool.GetAsync(_subscriptionName, _topicName, _connectionName); |
|||
serviceBusProcessor.ProcessErrorAsync += HandleIncomingError; |
|||
serviceBusProcessor.ProcessMessageAsync += HandleIncomingMessage; |
|||
|
|||
if (!serviceBusProcessor.IsProcessing) |
|||
{ |
|||
await serviceBusProcessor.StartProcessingAsync(); |
|||
} |
|||
|
|||
while (true) |
|||
{ |
|||
Thread.Sleep(1000); |
|||
} |
|||
}, TaskCreationOptions.LongRunning); |
|||
} |
|||
|
|||
protected virtual async Task HandleIncomingMessage(ProcessMessageEventArgs args) |
|||
{ |
|||
try |
|||
{ |
|||
foreach (var callback in _callbacks) |
|||
{ |
|||
await callback(args.Message); |
|||
} |
|||
|
|||
await args.CompleteMessageAsync(args.Message); |
|||
} |
|||
catch (Exception exception) |
|||
{ |
|||
await HandleError(exception); |
|||
} |
|||
} |
|||
|
|||
protected virtual async Task HandleIncomingError(ProcessErrorEventArgs args) |
|||
{ |
|||
await HandleError(args.Exception); |
|||
} |
|||
|
|||
protected virtual async Task HandleError(Exception exception) |
|||
{ |
|||
Logger.LogException(exception); |
|||
await _exceptionNotifier.NotifyAsync(exception); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public class AzureServiceBusMessageConsumerFactory : IAzureServiceBusMessageConsumerFactory, ISingletonDependency, IDisposable |
|||
{ |
|||
protected IServiceScope ServiceScope { get; } |
|||
|
|||
public AzureServiceBusMessageConsumerFactory(IServiceScopeFactory serviceScopeFactory) |
|||
{ |
|||
ServiceScope = serviceScopeFactory.CreateScope(); |
|||
} |
|||
|
|||
public IAzureServiceBusMessageConsumer CreateMessageConsumer(string topicName, string subscriptionName, string connectionName) |
|||
{ |
|||
var processor = ServiceScope.ServiceProvider.GetRequiredService<AzureServiceBusMessageConsumer>(); |
|||
processor.Initialize(topicName, subscriptionName, connectionName); |
|||
return processor; |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
ServiceScope?.Dispose(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,16 @@ |
|||
using Azure.Messaging.ServiceBus; |
|||
using Azure.Messaging.ServiceBus.Administration; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public class ClientConfig |
|||
{ |
|||
public string ConnectionString { get; set; } |
|||
|
|||
public ServiceBusAdministrationClientOptions Admin { get; set; } = new(); |
|||
|
|||
public ServiceBusClientOptions Client { get; set; } = new(); |
|||
|
|||
public ServiceBusProcessorOptions Processor { get; set; } = new(); |
|||
} |
|||
} |
|||
@ -0,0 +1,88 @@ |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using Azure.Messaging.ServiceBus; |
|||
using Azure.Messaging.ServiceBus.Administration; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public class ConnectionPool : IConnectionPool, ISingletonDependency |
|||
{ |
|||
public ILogger<ConnectionPool> Logger { get; set; } |
|||
|
|||
private bool _isDisposed; |
|||
private readonly AbpAzureServiceBusOptions _options; |
|||
private readonly ConcurrentDictionary<string, Lazy<ServiceBusClient>> _clients; |
|||
private readonly ConcurrentDictionary<string, Lazy<ServiceBusAdministrationClient>> _adminClients; |
|||
|
|||
public ConnectionPool(IOptions<AbpAzureServiceBusOptions> options) |
|||
{ |
|||
_options = options.Value; |
|||
_clients = new ConcurrentDictionary<string, Lazy<ServiceBusClient>>(); |
|||
_adminClients = new ConcurrentDictionary<string, Lazy<ServiceBusAdministrationClient>>(); |
|||
Logger = new NullLogger<ConnectionPool>(); |
|||
} |
|||
|
|||
public ServiceBusClient GetClient(string connectionName) |
|||
{ |
|||
connectionName ??= AzureServiceBusConnections.DefaultConnectionName; |
|||
return _clients.GetOrAdd( |
|||
connectionName, new Lazy<ServiceBusClient>(() => |
|||
{ |
|||
var config = _options.Connections.GetOrDefault(connectionName); |
|||
return new ServiceBusClient(config.ConnectionString, config.Client); |
|||
}) |
|||
).Value; |
|||
} |
|||
|
|||
public ServiceBusAdministrationClient GetAdministrationClient(string connectionName) |
|||
{ |
|||
connectionName ??= AzureServiceBusConnections.DefaultConnectionName; |
|||
return _adminClients.GetOrAdd( |
|||
connectionName, new Lazy<ServiceBusAdministrationClient>(() => |
|||
{ |
|||
var config = _options.Connections.GetOrDefault(connectionName); |
|||
return new ServiceBusAdministrationClient(config.ConnectionString); |
|||
}) |
|||
).Value; |
|||
} |
|||
|
|||
public async ValueTask DisposeAsync() |
|||
{ |
|||
if (_isDisposed) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
_isDisposed = true; |
|||
if (!_clients.Any()) |
|||
{ |
|||
Logger.LogDebug($"Disposed connection pool with no connection in the pool."); |
|||
return; |
|||
} |
|||
|
|||
Logger.LogInformation($"Disposing connection pool ({_clients.Count} connections)."); |
|||
|
|||
foreach (var connection in _clients.Values) |
|||
{ |
|||
await connection.Value.DisposeAsync(); |
|||
} |
|||
|
|||
_clients.Clear(); |
|||
|
|||
if (!_adminClients.Any()) |
|||
{ |
|||
Logger.LogDebug($"Disposed admin connection pool with no admin connection in the pool."); |
|||
return; |
|||
} |
|||
|
|||
Logger.LogInformation($"Disposing admin connection pool ({_adminClients.Count} admin connections)."); |
|||
_adminClients.Clear(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,11 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Azure.Messaging.ServiceBus; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public interface IAzureServiceBusMessageConsumer |
|||
{ |
|||
void OnMessageReceived(Func<ServiceBusReceivedMessage, Task> callback); |
|||
} |
|||
} |
|||
@ -0,0 +1,21 @@ |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public interface IAzureServiceBusMessageConsumerFactory |
|||
{ |
|||
/// <summary>
|
|||
/// Creates a new <see cref="IAzureServiceBusMessageConsumerFactory"/>.
|
|||
/// Avoid to create too many consumers since they are
|
|||
/// not disposed until end of the application.
|
|||
/// </summary>
|
|||
/// <param name="topicName"></param>
|
|||
/// <param name="subscriptionName"></param>
|
|||
/// <param name="connectionName"></param>
|
|||
/// <returns></returns>
|
|||
IAzureServiceBusMessageConsumer CreateMessageConsumer( |
|||
string topicName, |
|||
string subscriptionName, |
|||
string connectionName); |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using System; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public interface IAzureServiceBusSerializer |
|||
{ |
|||
byte[] Serialize(object obj); |
|||
|
|||
object Deserialize(byte[] value, Type type); |
|||
|
|||
T Deserialize<T>(byte[] value); |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using System; |
|||
using Azure.Messaging.ServiceBus; |
|||
using Azure.Messaging.ServiceBus.Administration; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public interface IConnectionPool : IAsyncDisposable |
|||
{ |
|||
ServiceBusClient GetClient(string connectionName); |
|||
|
|||
ServiceBusAdministrationClient GetAdministrationClient(string connectionName); |
|||
} |
|||
} |
|||
@ -0,0 +1,11 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Azure.Messaging.ServiceBus; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public interface IProcessorPool : IAsyncDisposable |
|||
{ |
|||
Task<ServiceBusProcessor> GetAsync(string subscriptionName, string topicName, string connectionName); |
|||
} |
|||
} |
|||
@ -0,0 +1,11 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Azure.Messaging.ServiceBus; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public interface IPublisherPool : IAsyncDisposable |
|||
{ |
|||
Task<ServiceBusSender> GetAsync(string topicName, string connectionName); |
|||
} |
|||
} |
|||
@ -0,0 +1,82 @@ |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using Azure.Messaging.ServiceBus; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public class ProcessorPool : IProcessorPool, ISingletonDependency |
|||
{ |
|||
public ILogger<ProcessorPool> Logger { get; set; } |
|||
|
|||
private bool _isDisposed; |
|||
private readonly AbpAzureServiceBusOptions _options; |
|||
private readonly IConnectionPool _connectionPool; |
|||
private readonly ConcurrentDictionary<string, Lazy<ServiceBusProcessor>> _processors; |
|||
|
|||
public ProcessorPool( |
|||
IOptions<AbpAzureServiceBusOptions> options, |
|||
IConnectionPool connectionPool) |
|||
{ |
|||
_options = options.Value; |
|||
_connectionPool = connectionPool; |
|||
_processors = new ConcurrentDictionary<string, Lazy<ServiceBusProcessor>>(); |
|||
Logger = new NullLogger<ProcessorPool>(); |
|||
} |
|||
|
|||
public async Task<ServiceBusProcessor> GetAsync(string subscriptionName, string topicName, string connectionName) |
|||
{ |
|||
var admin = _connectionPool.GetAdministrationClient(connectionName); |
|||
await admin.SetupSubscriptionAsync(topicName, subscriptionName); |
|||
|
|||
return _processors.GetOrAdd( |
|||
$"{topicName}-{subscriptionName}", new Lazy<ServiceBusProcessor>(() => |
|||
{ |
|||
var config = _options.Connections.GetOrDefault(connectionName); |
|||
var client = _connectionPool.GetClient(connectionName); |
|||
return client.CreateProcessor(topicName, subscriptionName, config.Processor); |
|||
}) |
|||
).Value; |
|||
} |
|||
|
|||
public async ValueTask DisposeAsync() |
|||
{ |
|||
if (_isDisposed) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
_isDisposed = true; |
|||
if (!_processors.Any()) |
|||
{ |
|||
Logger.LogDebug($"Disposed processor pool with no processors in the pool."); |
|||
return; |
|||
} |
|||
|
|||
Logger.LogInformation($"Disposing processor pool ({_processors.Count} processors)."); |
|||
|
|||
foreach (var item in _processors.Values) |
|||
{ |
|||
var processor = item.Value; |
|||
if (processor.IsProcessing) |
|||
{ |
|||
await processor.StopProcessingAsync(); |
|||
} |
|||
|
|||
if (!processor.IsClosed) |
|||
{ |
|||
await processor.CloseAsync(); |
|||
} |
|||
|
|||
await processor.DisposeAsync(); |
|||
} |
|||
|
|||
_processors.Clear(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,66 @@ |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using Azure.Messaging.ServiceBus; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public class PublisherPool : IPublisherPool, ISingletonDependency |
|||
{ |
|||
public ILogger<PublisherPool> Logger { get; set; } |
|||
|
|||
private bool _isDisposed; |
|||
private readonly IConnectionPool _connectionPool; |
|||
private readonly ConcurrentDictionary<string, Lazy<ServiceBusSender>> _publishers; |
|||
|
|||
public PublisherPool(IConnectionPool connectionPool) |
|||
{ |
|||
_connectionPool = connectionPool; |
|||
_publishers = new ConcurrentDictionary<string, Lazy<ServiceBusSender>>(); |
|||
Logger = new NullLogger<PublisherPool>(); |
|||
} |
|||
|
|||
public async Task<ServiceBusSender> GetAsync(string topicName, string connectionName) |
|||
{ |
|||
var admin = _connectionPool.GetAdministrationClient(connectionName); |
|||
await admin.SetupTopicAsync(topicName); |
|||
|
|||
return _publishers.GetOrAdd( |
|||
topicName, new Lazy<ServiceBusSender>(() => |
|||
{ |
|||
var client = _connectionPool.GetClient(connectionName); |
|||
return client.CreateSender(topicName); |
|||
}) |
|||
).Value; |
|||
} |
|||
|
|||
public async ValueTask DisposeAsync() |
|||
{ |
|||
if (_isDisposed) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
_isDisposed = true; |
|||
if (!_publishers.Any()) |
|||
{ |
|||
Logger.LogDebug($"Disposed publisher pool with no publisher in the pool."); |
|||
return; |
|||
} |
|||
|
|||
Logger.LogInformation($"Disposing publisher pool ({_publishers.Count} publishers)."); |
|||
|
|||
foreach (var publisher in _publishers.Values) |
|||
{ |
|||
await publisher.Value.CloseAsync(); |
|||
await publisher.Value.DisposeAsync(); |
|||
} |
|||
|
|||
_publishers.Clear(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
using System.Threading.Tasks; |
|||
using Azure.Messaging.ServiceBus.Administration; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public static class ServiceBusAdministrationClientExtensions |
|||
{ |
|||
public static async Task SetupTopicAsync(this ServiceBusAdministrationClient client, string topicName) |
|||
{ |
|||
if (!await client.TopicExistsAsync(topicName)) |
|||
{ |
|||
await client.CreateTopicAsync(topicName); |
|||
} |
|||
} |
|||
|
|||
public static async Task SetupSubscriptionAsync(this ServiceBusAdministrationClient client, string topicName, string subscriptionName) |
|||
{ |
|||
await client.SetupTopicAsync(topicName); |
|||
if (!await client.SubscriptionExistsAsync(topicName, subscriptionName)) |
|||
{ |
|||
await client.CreateSubscriptionAsync(topicName, subscriptionName); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,32 @@ |
|||
using System; |
|||
using System.Text; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Json; |
|||
|
|||
namespace Volo.Abp.AzureServiceBus |
|||
{ |
|||
public class Utf8JsonAzureServiceBusSerializer : IAzureServiceBusSerializer, ITransientDependency |
|||
{ |
|||
private readonly IJsonSerializer _jsonSerializer; |
|||
|
|||
public Utf8JsonAzureServiceBusSerializer(IJsonSerializer jsonSerializer) |
|||
{ |
|||
_jsonSerializer = jsonSerializer; |
|||
} |
|||
|
|||
public byte[] Serialize(object obj) |
|||
{ |
|||
return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj)); |
|||
} |
|||
|
|||
public object Deserialize(byte[] value, Type type) |
|||
{ |
|||
return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value)); |
|||
} |
|||
|
|||
public T Deserialize<T>(byte[] value) |
|||
{ |
|||
return _jsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(value)); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,3 @@ |
|||
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd"> |
|||
<ConfigureAwait ContinueOnCapturedContext="false" /> |
|||
</Weavers> |
|||
@ -0,0 +1,30 @@ |
|||
<?xml version="1.0" encoding="utf-8"?> |
|||
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> |
|||
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. --> |
|||
<xs:element name="Weavers"> |
|||
<xs:complexType> |
|||
<xs:all> |
|||
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1"> |
|||
<xs:complexType> |
|||
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" /> |
|||
</xs:complexType> |
|||
</xs:element> |
|||
</xs:all> |
|||
<xs:attribute name="VerifyAssembly" type="xs:boolean"> |
|||
<xs:annotation> |
|||
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
<xs:attribute name="VerifyIgnoreCodes" type="xs:string"> |
|||
<xs:annotation> |
|||
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
<xs:attribute name="GenerateXsd" type="xs:boolean"> |
|||
<xs:annotation> |
|||
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
</xs:complexType> |
|||
</xs:element> |
|||
</xs:schema> |
|||
@ -0,0 +1,22 @@ |
|||
<Project Sdk="Microsoft.NET.Sdk"> |
|||
|
|||
<Import Project="..\..\..\configureawait.props" /> |
|||
<Import Project="..\..\..\common.props" /> |
|||
|
|||
<PropertyGroup> |
|||
<TargetFramework>netstandard2.0</TargetFramework> |
|||
<AssemblyName>Volo.Abp.EventBus.Azure</AssemblyName> |
|||
<PackageId>Volo.Abp.EventBus.Azure</PackageId> |
|||
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback> |
|||
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute> |
|||
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute> |
|||
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute> |
|||
<RootNamespace /> |
|||
</PropertyGroup> |
|||
|
|||
<ItemGroup> |
|||
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" /> |
|||
<ProjectReference Include="..\Volo.Abp.AzureServiceBus\Volo.Abp.AzureServiceBus.csproj" /> |
|||
</ItemGroup> |
|||
|
|||
</Project> |
|||
@ -0,0 +1,11 @@ |
|||
namespace Volo.Abp.EventBus.Azure |
|||
{ |
|||
public class AbpAzureEventBusOptions |
|||
{ |
|||
public string ConnectionName { get; set; } |
|||
|
|||
public string SubscriberName { get; set; } |
|||
|
|||
public string TopicName { get; set; } |
|||
} |
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Volo.Abp.AzureServiceBus; |
|||
using Volo.Abp.Modularity; |
|||
|
|||
namespace Volo.Abp.EventBus.Azure |
|||
{ |
|||
[DependsOn( |
|||
typeof(AbpEventBusModule), |
|||
typeof(AbpAzureServiceBusModule) |
|||
)] |
|||
public class AbpEventBusAzureModule : AbpModule |
|||
{ |
|||
public override void ConfigureServices(ServiceConfigurationContext context) |
|||
{ |
|||
var configuration = context.Services.GetConfiguration(); |
|||
|
|||
Configure<AbpAzureEventBusOptions>(configuration.GetSection("Azure:EventBus")); |
|||
} |
|||
|
|||
public override void OnApplicationInitialization(ApplicationInitializationContext context) |
|||
{ |
|||
context |
|||
.ServiceProvider |
|||
.GetRequiredService<AzureDistributedEventBus>() |
|||
.Initialize(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,234 @@ |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using Azure.Messaging.ServiceBus; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.AzureServiceBus; |
|||
using Volo.Abp.Guids; |
|||
using Volo.Abp.MultiTenancy; |
|||
using Volo.Abp.Threading; |
|||
using Volo.Abp.Timing; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EventBus.Azure |
|||
{ |
|||
[Dependency(ReplaceServices = true)] |
|||
[ExposeServices(typeof(IDistributedEventBus), typeof(AzureDistributedEventBus))] |
|||
public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDependency |
|||
{ |
|||
private readonly AbpAzureEventBusOptions _options; |
|||
private readonly IAzureServiceBusMessageConsumerFactory _messageConsumerFactory; |
|||
private readonly IPublisherPool _publisherPool; |
|||
private readonly IAzureServiceBusSerializer _serializer; |
|||
private readonly ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories; |
|||
private readonly ConcurrentDictionary<string, Type> _eventTypes; |
|||
private IAzureServiceBusMessageConsumer _consumer; |
|||
|
|||
public AzureDistributedEventBus( |
|||
IServiceScopeFactory serviceScopeFactory, |
|||
ICurrentTenant currentTenant, |
|||
IUnitOfWorkManager unitOfWorkManager, |
|||
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions, |
|||
IGuidGenerator guidGenerator, |
|||
IClock clock, |
|||
IOptions<AbpAzureEventBusOptions> abpAzureEventBusOptions, |
|||
IAzureServiceBusSerializer serializer, |
|||
IAzureServiceBusMessageConsumerFactory messageConsumerFactory, |
|||
IPublisherPool publisherPool) |
|||
: base(serviceScopeFactory, |
|||
currentTenant, |
|||
unitOfWorkManager, |
|||
abpDistributedEventBusOptions, |
|||
guidGenerator, |
|||
clock) |
|||
{ |
|||
_options = abpAzureEventBusOptions.Value; |
|||
_serializer = serializer; |
|||
_messageConsumerFactory = messageConsumerFactory; |
|||
_publisherPool = publisherPool; |
|||
_handlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>(); |
|||
_eventTypes = new ConcurrentDictionary<string, Type>(); |
|||
} |
|||
|
|||
public void Initialize() |
|||
{ |
|||
_consumer = _messageConsumerFactory.CreateMessageConsumer( |
|||
_options.TopicName, |
|||
_options.SubscriberName, |
|||
_options.ConnectionName); |
|||
|
|||
_consumer.OnMessageReceived(ProcessEventAsync); |
|||
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); |
|||
} |
|||
|
|||
private async Task ProcessEventAsync(ServiceBusReceivedMessage message) |
|||
{ |
|||
var eventName = message.Subject; |
|||
var eventType = _eventTypes.GetOrDefault(eventName); |
|||
if (eventType == null) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
if (await AddToInboxAsync(message.MessageId, eventName, eventType, message.Body.ToArray())) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
var eventData = _serializer.Deserialize(message.Body.ToArray(), eventType); |
|||
|
|||
await TriggerHandlersAsync(eventType, eventData); |
|||
} |
|||
|
|||
public override async Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) |
|||
{ |
|||
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData); |
|||
} |
|||
|
|||
public override async Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) |
|||
{ |
|||
var eventType = _eventTypes.GetOrDefault(incomingEvent.EventName); |
|||
if (eventType == null) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
var eventData = _serializer.Deserialize(incomingEvent.EventData, eventType); |
|||
var exceptions = new List<Exception>(); |
|||
await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig); |
|||
if (exceptions.Any()) |
|||
{ |
|||
ThrowOriginalExceptions(eventType, exceptions); |
|||
} |
|||
} |
|||
|
|||
protected override byte[] Serialize(object eventData) |
|||
{ |
|||
return _serializer.Serialize(eventData); |
|||
} |
|||
|
|||
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) |
|||
{ |
|||
var handlerFactories = GetOrCreateHandlerFactories(eventType); |
|||
|
|||
if (factory.IsInFactories(handlerFactories)) |
|||
{ |
|||
return NullDisposable.Instance; |
|||
} |
|||
|
|||
handlerFactories.Add(factory); |
|||
|
|||
return new EventHandlerFactoryUnregistrar(this, eventType, factory); |
|||
} |
|||
|
|||
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action) |
|||
{ |
|||
Check.NotNull(action, nameof(action)); |
|||
|
|||
GetOrCreateHandlerFactories(typeof(TEvent)) |
|||
.Locking(factories => |
|||
{ |
|||
factories.RemoveAll( |
|||
factory => |
|||
{ |
|||
var singleInstanceFactory = factory as SingleInstanceHandlerFactory; |
|||
if (singleInstanceFactory == null) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEvent>; |
|||
if (actionHandler == null) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
return actionHandler.Action == action; |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
public override void Unsubscribe(Type eventType, IEventHandler handler) |
|||
{ |
|||
GetOrCreateHandlerFactories(eventType) |
|||
.Locking(factories => |
|||
{ |
|||
factories.RemoveAll( |
|||
factory => |
|||
factory is SingleInstanceHandlerFactory handlerFactory && |
|||
handlerFactory.HandlerInstance == handler |
|||
); |
|||
}); |
|||
} |
|||
|
|||
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) |
|||
{ |
|||
GetOrCreateHandlerFactories(eventType) |
|||
.Locking(factories => factories.Remove(factory)); |
|||
} |
|||
|
|||
public override void UnsubscribeAll(Type eventType) |
|||
{ |
|||
GetOrCreateHandlerFactories(eventType) |
|||
.Locking(factories => factories.Clear()); |
|||
} |
|||
|
|||
protected override async Task PublishToEventBusAsync(Type eventType, object eventData) |
|||
{ |
|||
await PublishAsync(eventType, eventData); |
|||
} |
|||
|
|||
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) |
|||
{ |
|||
unitOfWork.AddOrReplaceDistributedEvent(eventRecord); |
|||
} |
|||
|
|||
protected virtual async Task PublishAsync(string eventName, object eventData) |
|||
{ |
|||
var body = _serializer.Serialize(eventData); |
|||
|
|||
var message = new ServiceBusMessage(body) |
|||
{ |
|||
Subject = eventName |
|||
}; |
|||
|
|||
var publisher = await _publisherPool.GetAsync( |
|||
_options.TopicName, |
|||
_options.ConnectionName); |
|||
|
|||
await publisher.SendMessageAsync(message); |
|||
} |
|||
|
|||
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType) |
|||
{ |
|||
return _handlerFactories |
|||
.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)) |
|||
.Select(handlerFactory => |
|||
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)) |
|||
.ToArray(); |
|||
} |
|||
|
|||
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) |
|||
{ |
|||
return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType); |
|||
} |
|||
|
|||
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType) |
|||
{ |
|||
return _handlerFactories.GetOrAdd( |
|||
eventType, |
|||
type => |
|||
{ |
|||
var eventName = EventNameAttribute.GetNameOrDefault(type); |
|||
_eventTypes[eventName] = type; |
|||
return new List<IEventHandlerFactory>(); |
|||
} |
|||
); |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue