diff --git a/framework/Volo.Abp.sln b/framework/Volo.Abp.sln index 67d2fa2573..56fd40cee3 100644 --- a/framework/Volo.Abp.sln +++ b/framework/Volo.Abp.sln @@ -383,6 +383,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.AspNetCore.Compone EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.AspNetCore.Mvc.UI.Bundling.Abstractions", "src\Volo.Abp.AspNetCore.Mvc.UI.Bundling.Abstractions\Volo.Abp.AspNetCore.Mvc.UI.Bundling.Abstractions.csproj", "{E9CE58DB-0789-4D18-8B63-474F7D7B14B4}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.AzureServiceBus", "src\Volo.Abp.AzureServiceBus\Volo.Abp.AzureServiceBus.csproj", "{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1141,6 +1143,10 @@ Global {E9CE58DB-0789-4D18-8B63-474F7D7B14B4}.Debug|Any CPU.Build.0 = Debug|Any CPU {E9CE58DB-0789-4D18-8B63-474F7D7B14B4}.Release|Any CPU.ActiveCfg = Release|Any CPU {E9CE58DB-0789-4D18-8B63-474F7D7B14B4}.Release|Any CPU.Build.0 = Release|Any CPU + {808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1334,6 +1340,7 @@ Global {863C18F9-2407-49F9-9ADC-F6229AF3B385} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} {B4B6B7DE-9798-4007-B1DF-7EE7929E392A} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} {E9CE58DB-0789-4D18-8B63-474F7D7B14B4} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} + {808EC18E-C8CC-4F5C-82B6-984EADBBF85D} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {BB97ECF4-9A84-433F-A80B-2A3285BDD1D5} diff --git a/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xml b/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xml new file mode 100644 index 0000000000..be0de3a908 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xsd b/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xsd new file mode 100644 index 0000000000..3f3946e282 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xsd @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed. + + + + + A comma-separated list of error codes that can be safely ignored in assembly verification. + + + + + 'false' to turn off automatic generation of the XML Schema file. + + + + + \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo.Abp.AzureServiceBus.csproj b/framework/src/Volo.Abp.AzureServiceBus/Volo.Abp.AzureServiceBus.csproj new file mode 100644 index 0000000000..77ccf8f772 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo.Abp.AzureServiceBus.csproj @@ -0,0 +1,17 @@ + + + + + + + netstandard2.0 + latest + + + + + + + + + diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusModule.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusModule.cs new file mode 100644 index 0000000000..e9d1c20cd3 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusModule.cs @@ -0,0 +1,20 @@ +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.Json; +using Volo.Abp.Modularity; +using Volo.Abp.Threading; + +namespace Volo.Abp.AzureServiceBus +{ + [DependsOn( + typeof(AbpJsonModule), + typeof(AbpThreadingModule) + )] + public class AbpAzureServiceBusModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + var configuration = context.Services.GetConfiguration(); + Configure(configuration.GetSection("Azure:ServiceBus")); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusOptions.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusOptions.cs new file mode 100644 index 0000000000..7672502d28 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusOptions.cs @@ -0,0 +1,12 @@ +namespace Volo.Abp.AzureServiceBus +{ + public class AbpAzureServiceBusOptions + { + public AzureServiceBusConnections Connections { get; } + + public AbpAzureServiceBusOptions() + { + Connections = new AzureServiceBusConnections(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusConnections.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusConnections.cs new file mode 100644 index 0000000000..f8b0683d96 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusConnections.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using JetBrains.Annotations; + +namespace Volo.Abp.AzureServiceBus +{ + [Serializable] + public class AzureServiceBusConnections : Dictionary + { + public const string DefaultConnectionName = "Default"; + + [NotNull] + public ClientConfig Default + { + get => this[DefaultConnectionName]; + set => this[DefaultConnectionName] = Check.NotNull(value, nameof(value)); + } + + public AzureServiceBusConnections() + { + Default = new ClientConfig(); + } + + public ClientConfig GetOrDefault(string connectionName) + { + return TryGetValue(connectionName, out var connectionFactory) + ? connectionFactory + : Default; + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumer.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumer.cs new file mode 100644 index 0000000000..6eeb8f7693 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumer.cs @@ -0,0 +1,104 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using JetBrains.Annotations; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; +using Volo.Abp.ExceptionHandling; +using Volo.Abp.Threading; + +namespace Volo.Abp.AzureServiceBus +{ + public class AzureServiceBusMessageConsumer : IAzureServiceBusMessageConsumer, ITransientDependency + { + public ILogger Logger { get; set; } + + private readonly IExceptionNotifier _exceptionNotifier; + private readonly IProcessorPool _processorPool; + private readonly ConcurrentBag> _callbacks; + private string _connectionName; + private string _subscriptionName; + private string _topicName; + + public AzureServiceBusMessageConsumer( + IExceptionNotifier exceptionNotifier, + IProcessorPool processorPool) + { + _exceptionNotifier = exceptionNotifier; + _processorPool = processorPool; + Logger = NullLogger.Instance; + _callbacks = new ConcurrentBag>(); + } + + public virtual void Initialize( + [NotNull] string topicName, + [NotNull] string subscriptionName, + string connectionName) + { + Check.NotNull(topicName, nameof(topicName)); + Check.NotNull(subscriptionName, nameof(subscriptionName)); + + _topicName = topicName; + _connectionName = connectionName ?? AzureServiceBusConnections.DefaultConnectionName; + _subscriptionName = subscriptionName; + StartProcessing(); + } + + public void OnMessageReceived(Func callback) + { + _callbacks.Add(callback); + } + + protected virtual void StartProcessing() + { + Task.Factory.StartNew(function: async () => + { + var serviceBusProcessor = await _processorPool.GetAsync(_subscriptionName, _topicName, _connectionName); + serviceBusProcessor.ProcessErrorAsync += HandleIncomingError; + serviceBusProcessor.ProcessMessageAsync += HandleIncomingMessage; + + if (!serviceBusProcessor.IsProcessing) + { + await serviceBusProcessor.StartProcessingAsync(); + } + + while (true) + { + Thread.Sleep(1000); + } + }, TaskCreationOptions.LongRunning); + } + + protected virtual async Task HandleIncomingMessage(ProcessMessageEventArgs args) + { + try + { + foreach (var callback in _callbacks) + { + await callback(args.Message); + } + + await args.CompleteMessageAsync(args.Message); + } + catch (Exception exception) + { + await HandleError(exception); + } + } + + protected virtual async Task HandleIncomingError(ProcessErrorEventArgs args) + { + await HandleError(args.Exception); + } + + protected virtual async Task HandleError(Exception exception) + { + Logger.LogException(exception); + await _exceptionNotifier.NotifyAsync(exception); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumerFactory.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumerFactory.cs new file mode 100644 index 0000000000..d373de43cf --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumerFactory.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.AzureServiceBus +{ + public class AzureServiceBusMessageConsumerFactory : IAzureServiceBusMessageConsumerFactory, ISingletonDependency, IDisposable + { + protected IServiceScope ServiceScope { get; } + + public AzureServiceBusMessageConsumerFactory(IServiceScopeFactory serviceScopeFactory) + { + ServiceScope = serviceScopeFactory.CreateScope(); + } + + public IAzureServiceBusMessageConsumer CreateMessageConsumer(string topicName, string subscriptionName, string connectionName) + { + var processor = ServiceScope.ServiceProvider.GetRequiredService(); + processor.Initialize(topicName, subscriptionName, connectionName); + return processor; + } + + public void Dispose() + { + ServiceScope?.Dispose(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ClientConfig.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ClientConfig.cs new file mode 100644 index 0000000000..9a4c8c2856 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ClientConfig.cs @@ -0,0 +1,16 @@ +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; + +namespace Volo.Abp.AzureServiceBus +{ + public class ClientConfig + { + public string ConnectionString { get; set; } + + public ServiceBusAdministrationClientOptions Admin { get; set; } = new(); + + public ServiceBusClientOptions Client { get; set; } = new(); + + public ServiceBusProcessorOptions Processor { get; set; } = new(); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ConnectionPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ConnectionPool.cs new file mode 100644 index 0000000000..4f38250dec --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ConnectionPool.cs @@ -0,0 +1,88 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.AzureServiceBus +{ + public class ConnectionPool : IConnectionPool, ISingletonDependency + { + public ILogger Logger { get; set; } + + private bool _isDisposed; + private readonly AbpAzureServiceBusOptions _options; + private readonly ConcurrentDictionary> _clients; + private readonly ConcurrentDictionary> _adminClients; + + public ConnectionPool(IOptions options) + { + _options = options.Value; + _clients = new ConcurrentDictionary>(); + _adminClients = new ConcurrentDictionary>(); + Logger = new NullLogger(); + } + + public ServiceBusClient GetClient(string connectionName) + { + connectionName ??= AzureServiceBusConnections.DefaultConnectionName; + return _clients.GetOrAdd( + connectionName, new Lazy(() => + { + var config = _options.Connections.GetOrDefault(connectionName); + return new ServiceBusClient(config.ConnectionString, config.Client); + }) + ).Value; + } + + public ServiceBusAdministrationClient GetAdministrationClient(string connectionName) + { + connectionName ??= AzureServiceBusConnections.DefaultConnectionName; + return _adminClients.GetOrAdd( + connectionName, new Lazy(() => + { + var config = _options.Connections.GetOrDefault(connectionName); + return new ServiceBusAdministrationClient(config.ConnectionString); + }) + ).Value; + } + + public async ValueTask DisposeAsync() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + if (!_clients.Any()) + { + Logger.LogDebug($"Disposed connection pool with no connection in the pool."); + return; + } + + Logger.LogInformation($"Disposing connection pool ({_clients.Count} connections)."); + + foreach (var connection in _clients.Values) + { + await connection.Value.DisposeAsync(); + } + + _clients.Clear(); + + if (!_adminClients.Any()) + { + Logger.LogDebug($"Disposed admin connection pool with no admin connection in the pool."); + return; + } + + Logger.LogInformation($"Disposing admin connection pool ({_adminClients.Count} admin connections)."); + _adminClients.Clear(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumer.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumer.cs new file mode 100644 index 0000000000..7a47e42e40 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumer.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IAzureServiceBusMessageConsumer + { + void OnMessageReceived(Func callback); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumerFactory.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumerFactory.cs new file mode 100644 index 0000000000..0f9f20f0aa --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumerFactory.cs @@ -0,0 +1,21 @@ +using System.Threading.Tasks; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IAzureServiceBusMessageConsumerFactory + { + /// + /// Creates a new . + /// Avoid to create too many consumers since they are + /// not disposed until end of the application. + /// + /// + /// + /// + /// + IAzureServiceBusMessageConsumer CreateMessageConsumer( + string topicName, + string subscriptionName, + string connectionName); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs new file mode 100644 index 0000000000..04f56d4470 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs @@ -0,0 +1,11 @@ +using System; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IAzureServiceBusSerializer + { + byte[] Serialize(object obj); + + object Deserialize(BinaryData value, Type type); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IConnectionPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IConnectionPool.cs new file mode 100644 index 0000000000..43ed188022 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IConnectionPool.cs @@ -0,0 +1,13 @@ +using System; +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IConnectionPool : IAsyncDisposable + { + ServiceBusClient GetClient(string connectionName); + + ServiceBusAdministrationClient GetAdministrationClient(string connectionName); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IProcessorPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IProcessorPool.cs new file mode 100644 index 0000000000..338bee393c --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IProcessorPool.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IProcessorPool : IAsyncDisposable + { + Task GetAsync(string subscriptionName, string topicName, string connectionName); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IPublisherPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IPublisherPool.cs new file mode 100644 index 0000000000..9ae141a252 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IPublisherPool.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Volo.Abp.AzureServiceBus +{ + public interface IPublisherPool : IAsyncDisposable + { + Task GetAsync(string topicName, string connectionName); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ProcessorPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ProcessorPool.cs new file mode 100644 index 0000000000..daa804e930 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ProcessorPool.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.AzureServiceBus +{ + public class ProcessorPool : IProcessorPool, ISingletonDependency + { + public ILogger Logger { get; set; } + + private bool _isDisposed; + private readonly AbpAzureServiceBusOptions _options; + private readonly IConnectionPool _connectionPool; + private readonly ConcurrentDictionary> _processors; + + public ProcessorPool( + IOptions options, + IConnectionPool connectionPool) + { + _options = options.Value; + _connectionPool = connectionPool; + _processors = new ConcurrentDictionary>(); + Logger = new NullLogger(); + } + + public async Task GetAsync(string subscriptionName, string topicName, string connectionName) + { + var admin = _connectionPool.GetAdministrationClient(connectionName); + await admin.SetupSubscriptionAsync(topicName, subscriptionName); + + return _processors.GetOrAdd( + $"{topicName}-{subscriptionName}", new Lazy(() => + { + var config = _options.Connections.GetOrDefault(connectionName); + var client = _connectionPool.GetClient(connectionName); + return client.CreateProcessor(topicName, subscriptionName, config.Processor); + }) + ).Value; + } + + public async ValueTask DisposeAsync() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + if (!_processors.Any()) + { + Logger.LogDebug($"Disposed processor pool with no processors in the pool."); + return; + } + + Logger.LogInformation($"Disposing processor pool ({_processors.Count} processors)."); + + foreach (var item in _processors.Values) + { + var processor = item.Value; + if (processor.IsProcessing) + { + await processor.StopProcessingAsync(); + } + + if (!processor.IsClosed) + { + await processor.CloseAsync(); + } + + await processor.DisposeAsync(); + } + + _processors.Clear(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/PublisherPool.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/PublisherPool.cs new file mode 100644 index 0000000000..57008e07cb --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/PublisherPool.cs @@ -0,0 +1,66 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.AzureServiceBus +{ + public class PublisherPool : IPublisherPool, ISingletonDependency + { + public ILogger Logger { get; set; } + + private bool _isDisposed; + private readonly IConnectionPool _connectionPool; + private readonly ConcurrentDictionary> _publishers; + + public PublisherPool(IConnectionPool connectionPool) + { + _connectionPool = connectionPool; + _publishers = new ConcurrentDictionary>(); + Logger = new NullLogger(); + } + + public async Task GetAsync(string topicName, string connectionName) + { + var admin = _connectionPool.GetAdministrationClient(connectionName); + await admin.SetupTopicAsync(topicName); + + return _publishers.GetOrAdd( + topicName, new Lazy(() => + { + var client = _connectionPool.GetClient(connectionName); + return client.CreateSender(topicName); + }) + ).Value; + } + + public async ValueTask DisposeAsync() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + if (!_publishers.Any()) + { + Logger.LogDebug($"Disposed publisher pool with no publisher in the pool."); + return; + } + + Logger.LogInformation($"Disposing publisher pool ({_publishers.Count} publishers)."); + + foreach (var publisher in _publishers.Values) + { + await publisher.Value.CloseAsync(); + await publisher.Value.DisposeAsync(); + } + + _publishers.Clear(); + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ServiceBusAdministrationClientExtensions.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ServiceBusAdministrationClientExtensions.cs new file mode 100644 index 0000000000..c6a09a155b --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ServiceBusAdministrationClientExtensions.cs @@ -0,0 +1,25 @@ +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus.Administration; + +namespace Volo.Abp.AzureServiceBus +{ + public static class ServiceBusAdministrationClientExtensions + { + public static async Task SetupTopicAsync(this ServiceBusAdministrationClient client, string topicName) + { + if (!await client.TopicExistsAsync(topicName)) + { + await client.CreateTopicAsync(topicName); + } + } + + public static async Task SetupSubscriptionAsync(this ServiceBusAdministrationClient client, string topicName, string subscriptionName) + { + await client.SetupTopicAsync(topicName); + if (!await client.SubscriptionExistsAsync(topicName, subscriptionName)) + { + await client.CreateSubscriptionAsync(topicName, subscriptionName); + } + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs new file mode 100644 index 0000000000..0a87e22934 --- /dev/null +++ b/framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs @@ -0,0 +1,27 @@ +using System; +using System.Text; +using Volo.Abp.DependencyInjection; +using Volo.Abp.Json; + +namespace Volo.Abp.AzureServiceBus +{ + public class Utf8JsonAzureServiceBusSerializer : IAzureServiceBusSerializer, ITransientDependency + { + private readonly IJsonSerializer _jsonSerializer; + + public Utf8JsonAzureServiceBusSerializer(IJsonSerializer jsonSerializer) + { + _jsonSerializer = jsonSerializer; + } + + public byte[] Serialize(object obj) + { + return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj)); + } + + public object Deserialize(BinaryData value, Type type) + { + return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value.ToArray())); + } + } +} \ No newline at end of file