From 5abd71f6c7691f63f66bca3589e32aeab7198547 Mon Sep 17 00:00:00 2001 From: Gideon de Swardt Date: Sat, 3 Apr 2021 22:32:37 +0100 Subject: [PATCH] Add Azure Service Bus module Added an ABP module for Azure Service Bus that implements common configuration, services, and factories that can be reused in any module. Added extension methods to the ServiceBusAdministrationClient to check if a topic and subscription exist and if not create it. Improved the performance of publishing and processing messages by creating a pool for ServiceBusAdministrationClient, ServiceBusClient, ServiceBusSender and ServiceBusProcessor. --- framework/Volo.Abp.sln | 7 ++ .../Volo.Abp.AzureServiceBus/FodyWeavers.xml | 3 + .../Volo.Abp.AzureServiceBus/FodyWeavers.xsd | 30 +++++ .../Volo.Abp.AzureServiceBus.csproj | 17 +++ .../AbpAzureServiceBusModule.cs | 20 ++++ .../AbpAzureServiceBusOptions.cs | 12 ++ .../AzureServiceBusConnections.cs | 31 ++++++ .../AzureServiceBusMessageConsumer.cs | 104 ++++++++++++++++++ .../AzureServiceBusMessageConsumerFactory.cs | 29 +++++ .../Volo/Abp/AzureServiceBus/ClientConfig.cs | 16 +++ .../Abp/AzureServiceBus/ConnectionPool.cs | 88 +++++++++++++++ .../IAzureServiceBusMessageConsumer.cs | 11 ++ .../IAzureServiceBusMessageConsumerFactory.cs | 21 ++++ .../IAzureServiceBusSerializer.cs | 11 ++ .../Abp/AzureServiceBus/IConnectionPool.cs | 13 +++ .../Abp/AzureServiceBus/IProcessorPool.cs | 11 ++ .../Abp/AzureServiceBus/IPublisherPool.cs | 11 ++ .../Volo/Abp/AzureServiceBus/ProcessorPool.cs | 82 ++++++++++++++ .../Volo/Abp/AzureServiceBus/PublisherPool.cs | 66 +++++++++++ ...erviceBusAdministrationClientExtensions.cs | 25 +++++ .../Utf8JsonAzureServiceBusSerializer.cs | 27 +++++ 21 files changed, 635 insertions(+) create mode 100644 framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xml create mode 100644 framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xsd create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo.Abp.AzureServiceBus.csproj create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusModule.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusOptions.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusConnections.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumer.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumerFactory.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ClientConfig.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ConnectionPool.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumer.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumerFactory.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IConnectionPool.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IProcessorPool.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IPublisherPool.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ProcessorPool.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/PublisherPool.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ServiceBusAdministrationClientExtensions.cs create mode 100644 framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs diff --git a/framework/Volo.Abp.sln b/framework/Volo.Abp.sln index 67d2fa2573..56fd40cee3 100644 --- a/framework/Volo.Abp.sln +++ b/framework/Volo.Abp.sln @@ -383,6 +383,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.AspNetCore.Compone EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.AspNetCore.Mvc.UI.Bundling.Abstractions", "src\Volo.Abp.AspNetCore.Mvc.UI.Bundling.Abstractions\Volo.Abp.AspNetCore.Mvc.UI.Bundling.Abstractions.csproj", "{E9CE58DB-0789-4D18-8B63-474F7D7B14B4}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.AzureServiceBus", "src\Volo.Abp.AzureServiceBus\Volo.Abp.AzureServiceBus.csproj", "{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1141,6 +1143,10 @@ Global {E9CE58DB-0789-4D18-8B63-474F7D7B14B4}.Debug|Any CPU.Build.0 = Debug|Any CPU {E9CE58DB-0789-4D18-8B63-474F7D7B14B4}.Release|Any CPU.ActiveCfg = Release|Any CPU {E9CE58DB-0789-4D18-8B63-474F7D7B14B4}.Release|Any CPU.Build.0 = Release|Any CPU + {808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1334,6 +1340,7 @@ Global {863C18F9-2407-49F9-9ADC-F6229AF3B385} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} {B4B6B7DE-9798-4007-B1DF-7EE7929E392A} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} {E9CE58DB-0789-4D18-8B63-474F7D7B14B4} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} + {808EC18E-C8CC-4F5C-82B6-984EADBBF85D} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {BB97ECF4-9A84-433F-A80B-2A3285BDD1D5} diff --git a/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xml b/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xml new file mode 100644 index 0000000000..be0de3a908 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xsd b/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xsd new file mode 100644 index 0000000000..3f3946e282 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xsd @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed. + + + + + A comma-separated list of error codes that can be safely ignored in assembly verification. + + + + + 'false' to turn off automatic generation of the XML Schema file. + + + + + \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo.Abp.AzureServiceBus.csproj b/framework/src/Volo.Abp.AzureServiceBus/Volo.Abp.AzureServiceBus.csproj new file mode 100644 index 0000000000..77ccf8f772 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo.Abp.AzureServiceBus.csproj @@ -0,0 +1,17 @@ + + + + + + + netstandard2.0 + latest + + + + + + + + + diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusModule.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusModule.cs new file mode 100644 index 0000000000..e9d1c20cd3 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusModule.cs @@ -0,0 +1,20 @@ +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.Json; +using Volo.Abp.Modularity; +using Volo.Abp.Threading; + +namespace Volo.Abp.AzureServiceBus +{ + [DependsOn( + typeof(AbpJsonModule), + typeof(AbpThreadingModule) + )] + public class AbpAzureServiceBusModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + var configuration = context.Services.GetConfiguration(); + Configure(configuration.GetSection("Azure:ServiceBus")); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusOptions.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusOptions.cs new file mode 100644 index 0000000000..7672502d28 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusOptions.cs @@ -0,0 +1,12 @@ +namespace Volo.Abp.AzureServiceBus +{ + public class AbpAzureServiceBusOptions + { + public AzureServiceBusConnections Connections { get; } + + public AbpAzureServiceBusOptions() + { + Connections = new AzureServiceBusConnections(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusConnections.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusConnections.cs new file mode 100644 index 0000000000..f8b0683d96 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusConnections.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using JetBrains.Annotations; + +namespace Volo.Abp.AzureServiceBus +{ + [Serializable] + public class AzureServiceBusConnections : Dictionary + { + public const string DefaultConnectionName = "Default"; + + [NotNull] + public ClientConfig Default + { + get => this[DefaultConnectionName]; + set => this[DefaultConnectionName] = Check.NotNull(value, nameof(value)); + } + + public AzureServiceBusConnections() + { + Default = new ClientConfig(); + } + + public ClientConfig GetOrDefault(string connectionName) + { + return TryGetValue(connectionName, out var connectionFactory) + ? connectionFactory + : Default; + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumer.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumer.cs new file mode 100644 index 0000000000..6eeb8f7693 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumer.cs @@ -0,0 +1,104 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using JetBrains.Annotations; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; +using Volo.Abp.ExceptionHandling; +using Volo.Abp.Threading; + +namespace Volo.Abp.AzureServiceBus +{ + public class AzureServiceBusMessageConsumer : IAzureServiceBusMessageConsumer, ITransientDependency + { + public ILogger Logger { get; set; } + + private readonly IExceptionNotifier _exceptionNotifier; + private readonly IProcessorPool _processorPool; + private readonly ConcurrentBag> _callbacks; + private string _connectionName; + private string _subscriptionName; + private string _topicName; + + public AzureServiceBusMessageConsumer( + IExceptionNotifier exceptionNotifier, + IProcessorPool processorPool) + { + _exceptionNotifier = exceptionNotifier; + _processorPool = processorPool; + Logger = NullLogger.Instance; + _callbacks = new ConcurrentBag>(); + } + + public virtual void Initialize( + [NotNull] string topicName, + [NotNull] string subscriptionName, + string connectionName) + { + Check.NotNull(topicName, nameof(topicName)); + Check.NotNull(subscriptionName, nameof(subscriptionName)); + + _topicName = topicName; + _connectionName = connectionName ?? AzureServiceBusConnections.DefaultConnectionName; + _subscriptionName = subscriptionName; + StartProcessing(); + } + + public void OnMessageReceived(Func callback) + { + _callbacks.Add(callback); + } + + protected virtual void StartProcessing() + { + Task.Factory.StartNew(function: async () => + { + var serviceBusProcessor = await _processorPool.GetAsync(_subscriptionName, _topicName, _connectionName); + serviceBusProcessor.ProcessErrorAsync += HandleIncomingError; + serviceBusProcessor.ProcessMessageAsync += HandleIncomingMessage; + + if (!serviceBusProcessor.IsProcessing) + { + await serviceBusProcessor.StartProcessingAsync(); + } + + while (true) + { + Thread.Sleep(1000); + } + }, TaskCreationOptions.LongRunning); + } + + protected virtual async Task HandleIncomingMessage(ProcessMessageEventArgs args) + { + try + { + foreach (var callback in _callbacks) + { + await callback(args.Message); + } + + await args.CompleteMessageAsync(args.Message); + } + catch (Exception exception) + { + await HandleError(exception); + } + } + + protected virtual async Task HandleIncomingError(ProcessErrorEventArgs args) + { + await HandleError(args.Exception); + } + + protected virtual async Task HandleError(Exception exception) + { + Logger.LogException(exception); + await _exceptionNotifier.NotifyAsync(exception); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumerFactory.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumerFactory.cs new file mode 100644 index 0000000000..d373de43cf --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumerFactory.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.AzureServiceBus +{ + public class AzureServiceBusMessageConsumerFactory : IAzureServiceBusMessageConsumerFactory, ISingletonDependency, IDisposable + { + protected IServiceScope ServiceScope { get; } + + public AzureServiceBusMessageConsumerFactory(IServiceScopeFactory serviceScopeFactory) + { + ServiceScope = serviceScopeFactory.CreateScope(); + } + + public IAzureServiceBusMessageConsumer CreateMessageConsumer(string topicName, string subscriptionName, string connectionName) + { + var processor = ServiceScope.ServiceProvider.GetRequiredService(); + processor.Initialize(topicName, subscriptionName, connectionName); + return processor; + } + + public void Dispose() + { + ServiceScope?.Dispose(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ClientConfig.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ClientConfig.cs new file mode 100644 index 0000000000..9a4c8c2856 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ClientConfig.cs @@ -0,0 +1,16 @@ +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; + +namespace Volo.Abp.AzureServiceBus +{ + public class ClientConfig + { + public string ConnectionString { get; set; } + + public ServiceBusAdministrationClientOptions Admin { get; set; } = new(); + + public ServiceBusClientOptions Client { get; set; } = new(); + + public ServiceBusProcessorOptions Processor { get; set; } = new(); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ConnectionPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ConnectionPool.cs new file mode 100644 index 0000000000..4f38250dec --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ConnectionPool.cs @@ -0,0 +1,88 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.AzureServiceBus +{ + public class ConnectionPool : IConnectionPool, ISingletonDependency + { + public ILogger Logger { get; set; } + + private bool _isDisposed; + private readonly AbpAzureServiceBusOptions _options; + private readonly ConcurrentDictionary> _clients; + private readonly ConcurrentDictionary> _adminClients; + + public ConnectionPool(IOptions options) + { + _options = options.Value; + _clients = new ConcurrentDictionary>(); + _adminClients = new ConcurrentDictionary>(); + Logger = new NullLogger(); + } + + public ServiceBusClient GetClient(string connectionName) + { + connectionName ??= AzureServiceBusConnections.DefaultConnectionName; + return _clients.GetOrAdd( + connectionName, new Lazy(() => + { + var config = _options.Connections.GetOrDefault(connectionName); + return new ServiceBusClient(config.ConnectionString, config.Client); + }) + ).Value; + } + + public ServiceBusAdministrationClient GetAdministrationClient(string connectionName) + { + connectionName ??= AzureServiceBusConnections.DefaultConnectionName; + return _adminClients.GetOrAdd( + connectionName, new Lazy(() => + { + var config = _options.Connections.GetOrDefault(connectionName); + return new ServiceBusAdministrationClient(config.ConnectionString); + }) + ).Value; + } + + public async ValueTask DisposeAsync() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + if (!_clients.Any()) + { + Logger.LogDebug($"Disposed connection pool with no connection in the pool."); + return; + } + + Logger.LogInformation($"Disposing connection pool ({_clients.Count} connections)."); + + foreach (var connection in _clients.Values) + { + await connection.Value.DisposeAsync(); + } + + _clients.Clear(); + + if (!_adminClients.Any()) + { + Logger.LogDebug($"Disposed admin connection pool with no admin connection in the pool."); + return; + } + + Logger.LogInformation($"Disposing admin connection pool ({_adminClients.Count} admin connections)."); + _adminClients.Clear(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumer.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumer.cs new file mode 100644 index 0000000000..7a47e42e40 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumer.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IAzureServiceBusMessageConsumer + { + void OnMessageReceived(Func callback); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumerFactory.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumerFactory.cs new file mode 100644 index 0000000000..0f9f20f0aa --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumerFactory.cs @@ -0,0 +1,21 @@ +using System.Threading.Tasks; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IAzureServiceBusMessageConsumerFactory + { + /// + /// Creates a new . + /// Avoid to create too many consumers since they are + /// not disposed until end of the application. + /// + /// + /// + /// + /// + IAzureServiceBusMessageConsumer CreateMessageConsumer( + string topicName, + string subscriptionName, + string connectionName); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs new file mode 100644 index 0000000000..04f56d4470 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs @@ -0,0 +1,11 @@ +using System; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IAzureServiceBusSerializer + { + byte[] Serialize(object obj); + + object Deserialize(BinaryData value, Type type); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IConnectionPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IConnectionPool.cs new file mode 100644 index 0000000000..43ed188022 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IConnectionPool.cs @@ -0,0 +1,13 @@ +using System; +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IConnectionPool : IAsyncDisposable + { + ServiceBusClient GetClient(string connectionName); + + ServiceBusAdministrationClient GetAdministrationClient(string connectionName); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IProcessorPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IProcessorPool.cs new file mode 100644 index 0000000000..338bee393c --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IProcessorPool.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IProcessorPool : IAsyncDisposable + { + Task GetAsync(string subscriptionName, string topicName, string connectionName); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IPublisherPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IPublisherPool.cs new file mode 100644 index 0000000000..9ae141a252 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IPublisherPool.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IPublisherPool : IAsyncDisposable + { + Task GetAsync(string topicName, string connectionName); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ProcessorPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ProcessorPool.cs new file mode 100644 index 0000000000..daa804e930 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ProcessorPool.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.AzureServiceBus +{ + public class ProcessorPool : IProcessorPool, ISingletonDependency + { + public ILogger Logger { get; set; } + + private bool _isDisposed; + private readonly AbpAzureServiceBusOptions _options; + private readonly IConnectionPool _connectionPool; + private readonly ConcurrentDictionary> _processors; + + public ProcessorPool( + IOptions options, + IConnectionPool connectionPool) + { + _options = options.Value; + _connectionPool = connectionPool; + _processors = new ConcurrentDictionary>(); + Logger = new NullLogger(); + } + + public async Task GetAsync(string subscriptionName, string topicName, string connectionName) + { + var admin = _connectionPool.GetAdministrationClient(connectionName); + await admin.SetupSubscriptionAsync(topicName, subscriptionName); + + return _processors.GetOrAdd( + $"{topicName}-{subscriptionName}", new Lazy(() => + { + var config = _options.Connections.GetOrDefault(connectionName); + var client = _connectionPool.GetClient(connectionName); + return client.CreateProcessor(topicName, subscriptionName, config.Processor); + }) + ).Value; + } + + public async ValueTask DisposeAsync() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + if (!_processors.Any()) + { + Logger.LogDebug($"Disposed processor pool with no processors in the pool."); + return; + } + + Logger.LogInformation($"Disposing processor pool ({_processors.Count} processors)."); + + foreach (var item in _processors.Values) + { + var processor = item.Value; + if (processor.IsProcessing) + { + await processor.StopProcessingAsync(); + } + + if (!processor.IsClosed) + { + await processor.CloseAsync(); + } + + await processor.DisposeAsync(); + } + + _processors.Clear(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/PublisherPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/PublisherPool.cs new file mode 100644 index 0000000000..57008e07cb --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/PublisherPool.cs @@ -0,0 +1,66 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.AzureServiceBus +{ + public class PublisherPool : IPublisherPool, ISingletonDependency + { + public ILogger Logger { get; set; } + + private bool _isDisposed; + private readonly IConnectionPool _connectionPool; + private readonly ConcurrentDictionary> _publishers; + + public PublisherPool(IConnectionPool connectionPool) + { + _connectionPool = connectionPool; + _publishers = new ConcurrentDictionary>(); + Logger = new NullLogger(); + } + + public async Task GetAsync(string topicName, string connectionName) + { + var admin = _connectionPool.GetAdministrationClient(connectionName); + await admin.SetupTopicAsync(topicName); + + return _publishers.GetOrAdd( + topicName, new Lazy(() => + { + var client = _connectionPool.GetClient(connectionName); + return client.CreateSender(topicName); + }) + ).Value; + } + + public async ValueTask DisposeAsync() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + if (!_publishers.Any()) + { + Logger.LogDebug($"Disposed publisher pool with no publisher in the pool."); + return; + } + + Logger.LogInformation($"Disposing publisher pool ({_publishers.Count} publishers)."); + + foreach (var publisher in _publishers.Values) + { + await publisher.Value.CloseAsync(); + await publisher.Value.DisposeAsync(); + } + + _publishers.Clear(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ServiceBusAdministrationClientExtensions.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ServiceBusAdministrationClientExtensions.cs new file mode 100644 index 0000000000..c6a09a155b --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ServiceBusAdministrationClientExtensions.cs @@ -0,0 +1,25 @@ +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus.Administration; + +namespace Volo.Abp.AzureServiceBus +{ + public static class ServiceBusAdministrationClientExtensions + { + public static async Task SetupTopicAsync(this ServiceBusAdministrationClient client, string topicName) + { + if (!await client.TopicExistsAsync(topicName)) + { + await client.CreateTopicAsync(topicName); + } + } + + public static async Task SetupSubscriptionAsync(this ServiceBusAdministrationClient client, string topicName, string subscriptionName) + { + await client.SetupTopicAsync(topicName); + if (!await client.SubscriptionExistsAsync(topicName, subscriptionName)) + { + await client.CreateSubscriptionAsync(topicName, subscriptionName); + } + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs new file mode 100644 index 0000000000..0a87e22934 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs @@ -0,0 +1,27 @@ +using System; +using System.Text; +using Volo.Abp.DependencyInjection; +using Volo.Abp.Json; + +namespace Volo.Abp.AzureServiceBus +{ + public class Utf8JsonAzureServiceBusSerializer : IAzureServiceBusSerializer, ITransientDependency + { + private readonly IJsonSerializer _jsonSerializer; + + public Utf8JsonAzureServiceBusSerializer(IJsonSerializer jsonSerializer) + { + _jsonSerializer = jsonSerializer; + } + + public byte[] Serialize(object obj) + { + return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj)); + } + + public object Deserialize(BinaryData value, Type type) + { + return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value.ToArray())); + } + } +} \ No newline at end of file