Browse Source

Add Azure Service Bus module

Added an ABP module for Azure Service Bus that implements common
configuration, services, and factories that can be reused in any module.

Added extension methods to the ServiceBusAdministrationClient to check
if a topic and subscription exist and if not create it.

Improved the performance of publishing and processing messages by
creating a pool for ServiceBusAdministrationClient, ServiceBusClient,
ServiceBusSender and ServiceBusProcessor.
pull/8375/head
Gideon de Swardt 5 years ago
parent
commit
5abd71f6c7
  1. 7
      framework/Volo.Abp.sln
  2. 3
      framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xml
  3. 30
      framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xsd
  4. 17
      framework/src/Volo.Abp.AzureServiceBus/Volo.Abp.AzureServiceBus.csproj
  5. 20
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusModule.cs
  6. 12
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusOptions.cs
  7. 31
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusConnections.cs
  8. 104
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumer.cs
  9. 29
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumerFactory.cs
  10. 16
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ClientConfig.cs
  11. 88
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ConnectionPool.cs
  12. 11
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumer.cs
  13. 21
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumerFactory.cs
  14. 11
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs
  15. 13
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IConnectionPool.cs
  16. 11
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IProcessorPool.cs
  17. 11
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IPublisherPool.cs
  18. 82
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ProcessorPool.cs
  19. 66
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/PublisherPool.cs
  20. 25
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ServiceBusAdministrationClientExtensions.cs
  21. 27
      framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs

7
framework/Volo.Abp.sln

@ -383,6 +383,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.AspNetCore.Compone
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.AspNetCore.Mvc.UI.Bundling.Abstractions", "src\Volo.Abp.AspNetCore.Mvc.UI.Bundling.Abstractions\Volo.Abp.AspNetCore.Mvc.UI.Bundling.Abstractions.csproj", "{E9CE58DB-0789-4D18-8B63-474F7D7B14B4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.AzureServiceBus", "src\Volo.Abp.AzureServiceBus\Volo.Abp.AzureServiceBus.csproj", "{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -1141,6 +1143,10 @@ Global
{E9CE58DB-0789-4D18-8B63-474F7D7B14B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E9CE58DB-0789-4D18-8B63-474F7D7B14B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E9CE58DB-0789-4D18-8B63-474F7D7B14B4}.Release|Any CPU.Build.0 = Release|Any CPU
{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{808EC18E-C8CC-4F5C-82B6-984EADBBF85D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -1334,6 +1340,7 @@ Global
{863C18F9-2407-49F9-9ADC-F6229AF3B385} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{B4B6B7DE-9798-4007-B1DF-7EE7929E392A} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{E9CE58DB-0789-4D18-8B63-474F7D7B14B4} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{808EC18E-C8CC-4F5C-82B6-984EADBBF85D} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {BB97ECF4-9A84-433F-A80B-2A3285BDD1D5}

3
framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xml

@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>

30
framework/src/Volo.Abp.AzureServiceBus/FodyWeavers.xsd

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>

17
framework/src/Volo.Abp.AzureServiceBus/Volo.Abp.AzureServiceBus.csproj

@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.1.1" />
<ProjectReference Include="..\Volo.Abp.Json\Volo.Abp.Json.csproj" />
<ProjectReference Include="..\Volo.Abp.Threading\Volo.Abp.Threading.csproj" />
</ItemGroup>
</Project>

20
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusModule.cs

@ -0,0 +1,20 @@
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Json;
using Volo.Abp.Modularity;
using Volo.Abp.Threading;
namespace Volo.Abp.AzureServiceBus
{
[DependsOn(
typeof(AbpJsonModule),
typeof(AbpThreadingModule)
)]
public class AbpAzureServiceBusModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
Configure<AbpAzureServiceBusOptions>(configuration.GetSection("Azure:ServiceBus"));
}
}
}

12
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AbpAzureServiceBusOptions.cs

@ -0,0 +1,12 @@
namespace Volo.Abp.AzureServiceBus
{
public class AbpAzureServiceBusOptions
{
public AzureServiceBusConnections Connections { get; }
public AbpAzureServiceBusOptions()
{
Connections = new AzureServiceBusConnections();
}
}
}

31
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusConnections.cs

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using JetBrains.Annotations;
namespace Volo.Abp.AzureServiceBus
{
[Serializable]
public class AzureServiceBusConnections : Dictionary<string, ClientConfig>
{
public const string DefaultConnectionName = "Default";
[NotNull]
public ClientConfig Default
{
get => this[DefaultConnectionName];
set => this[DefaultConnectionName] = Check.NotNull(value, nameof(value));
}
public AzureServiceBusConnections()
{
Default = new ClientConfig();
}
public ClientConfig GetOrDefault(string connectionName)
{
return TryGetValue(connectionName, out var connectionFactory)
? connectionFactory
: Default;
}
}
}

104
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumer.cs

@ -0,0 +1,104 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.ExceptionHandling;
using Volo.Abp.Threading;
namespace Volo.Abp.AzureServiceBus
{
public class AzureServiceBusMessageConsumer : IAzureServiceBusMessageConsumer, ITransientDependency
{
public ILogger<AzureServiceBusMessageConsumer> Logger { get; set; }
private readonly IExceptionNotifier _exceptionNotifier;
private readonly IProcessorPool _processorPool;
private readonly ConcurrentBag<Func<ServiceBusReceivedMessage, Task>> _callbacks;
private string _connectionName;
private string _subscriptionName;
private string _topicName;
public AzureServiceBusMessageConsumer(
IExceptionNotifier exceptionNotifier,
IProcessorPool processorPool)
{
_exceptionNotifier = exceptionNotifier;
_processorPool = processorPool;
Logger = NullLogger<AzureServiceBusMessageConsumer>.Instance;
_callbacks = new ConcurrentBag<Func<ServiceBusReceivedMessage, Task>>();
}
public virtual void Initialize(
[NotNull] string topicName,
[NotNull] string subscriptionName,
string connectionName)
{
Check.NotNull(topicName, nameof(topicName));
Check.NotNull(subscriptionName, nameof(subscriptionName));
_topicName = topicName;
_connectionName = connectionName ?? AzureServiceBusConnections.DefaultConnectionName;
_subscriptionName = subscriptionName;
StartProcessing();
}
public void OnMessageReceived(Func<ServiceBusReceivedMessage, Task> callback)
{
_callbacks.Add(callback);
}
protected virtual void StartProcessing()
{
Task.Factory.StartNew(function: async () =>
{
var serviceBusProcessor = await _processorPool.GetAsync(_subscriptionName, _topicName, _connectionName);
serviceBusProcessor.ProcessErrorAsync += HandleIncomingError;
serviceBusProcessor.ProcessMessageAsync += HandleIncomingMessage;
if (!serviceBusProcessor.IsProcessing)
{
await serviceBusProcessor.StartProcessingAsync();
}
while (true)
{
Thread.Sleep(1000);
}
}, TaskCreationOptions.LongRunning);
}
protected virtual async Task HandleIncomingMessage(ProcessMessageEventArgs args)
{
try
{
foreach (var callback in _callbacks)
{
await callback(args.Message);
}
await args.CompleteMessageAsync(args.Message);
}
catch (Exception exception)
{
await HandleError(exception);
}
}
protected virtual async Task HandleIncomingError(ProcessErrorEventArgs args)
{
await HandleError(args.Exception);
}
protected virtual async Task HandleError(Exception exception)
{
Logger.LogException(exception);
await _exceptionNotifier.NotifyAsync(exception);
}
}
}

29
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/AzureServiceBusMessageConsumerFactory.cs

@ -0,0 +1,29 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.AzureServiceBus
{
public class AzureServiceBusMessageConsumerFactory : IAzureServiceBusMessageConsumerFactory, ISingletonDependency, IDisposable
{
protected IServiceScope ServiceScope { get; }
public AzureServiceBusMessageConsumerFactory(IServiceScopeFactory serviceScopeFactory)
{
ServiceScope = serviceScopeFactory.CreateScope();
}
public IAzureServiceBusMessageConsumer CreateMessageConsumer(string topicName, string subscriptionName, string connectionName)
{
var processor = ServiceScope.ServiceProvider.GetRequiredService<AzureServiceBusMessageConsumer>();
processor.Initialize(topicName, subscriptionName, connectionName);
return processor;
}
public void Dispose()
{
ServiceScope?.Dispose();
}
}
}

16
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ClientConfig.cs

@ -0,0 +1,16 @@
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
namespace Volo.Abp.AzureServiceBus
{
public class ClientConfig
{
public string ConnectionString { get; set; }
public ServiceBusAdministrationClientOptions Admin { get; set; } = new();
public ServiceBusClientOptions Client { get; set; } = new();
public ServiceBusProcessorOptions Processor { get; set; } = new();
}
}

88
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ConnectionPool.cs

@ -0,0 +1,88 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.AzureServiceBus
{
public class ConnectionPool : IConnectionPool, ISingletonDependency
{
public ILogger<ConnectionPool> Logger { get; set; }
private bool _isDisposed;
private readonly AbpAzureServiceBusOptions _options;
private readonly ConcurrentDictionary<string, Lazy<ServiceBusClient>> _clients;
private readonly ConcurrentDictionary<string, Lazy<ServiceBusAdministrationClient>> _adminClients;
public ConnectionPool(IOptions<AbpAzureServiceBusOptions> options)
{
_options = options.Value;
_clients = new ConcurrentDictionary<string, Lazy<ServiceBusClient>>();
_adminClients = new ConcurrentDictionary<string, Lazy<ServiceBusAdministrationClient>>();
Logger = new NullLogger<ConnectionPool>();
}
public ServiceBusClient GetClient(string connectionName)
{
connectionName ??= AzureServiceBusConnections.DefaultConnectionName;
return _clients.GetOrAdd(
connectionName, new Lazy<ServiceBusClient>(() =>
{
var config = _options.Connections.GetOrDefault(connectionName);
return new ServiceBusClient(config.ConnectionString, config.Client);
})
).Value;
}
public ServiceBusAdministrationClient GetAdministrationClient(string connectionName)
{
connectionName ??= AzureServiceBusConnections.DefaultConnectionName;
return _adminClients.GetOrAdd(
connectionName, new Lazy<ServiceBusAdministrationClient>(() =>
{
var config = _options.Connections.GetOrDefault(connectionName);
return new ServiceBusAdministrationClient(config.ConnectionString);
})
).Value;
}
public async ValueTask DisposeAsync()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
if (!_clients.Any())
{
Logger.LogDebug($"Disposed connection pool with no connection in the pool.");
return;
}
Logger.LogInformation($"Disposing connection pool ({_clients.Count} connections).");
foreach (var connection in _clients.Values)
{
await connection.Value.DisposeAsync();
}
_clients.Clear();
if (!_adminClients.Any())
{
Logger.LogDebug($"Disposed admin connection pool with no admin connection in the pool.");
return;
}
Logger.LogInformation($"Disposing admin connection pool ({_adminClients.Count} admin connections).");
_adminClients.Clear();
}
}
}

11
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumer.cs

@ -0,0 +1,11 @@
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
namespace Volo.Abp.AzureServiceBus
{
public interface IAzureServiceBusMessageConsumer
{
void OnMessageReceived(Func<ServiceBusReceivedMessage, Task> callback);
}
}

21
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusMessageConsumerFactory.cs

@ -0,0 +1,21 @@
using System.Threading.Tasks;
namespace Volo.Abp.AzureServiceBus
{
public interface IAzureServiceBusMessageConsumerFactory
{
/// <summary>
/// Creates a new <see cref="IAzureServiceBusMessageConsumerFactory"/>.
/// Avoid to create too many consumers since they are
/// not disposed until end of the application.
/// </summary>
/// <param name="topicName"></param>
/// <param name="subscriptionName"></param>
/// <param name="connectionName"></param>
/// <returns></returns>
IAzureServiceBusMessageConsumer CreateMessageConsumer(
string topicName,
string subscriptionName,
string connectionName);
}
}

11
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IAzureServiceBusSerializer.cs

@ -0,0 +1,11 @@
using System;
namespace Volo.Abp.AzureServiceBus
{
public interface IAzureServiceBusSerializer
{
byte[] Serialize(object obj);
object Deserialize(BinaryData value, Type type);
}
}

13
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IConnectionPool.cs

@ -0,0 +1,13 @@
using System;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
namespace Volo.Abp.AzureServiceBus
{
public interface IConnectionPool : IAsyncDisposable
{
ServiceBusClient GetClient(string connectionName);
ServiceBusAdministrationClient GetAdministrationClient(string connectionName);
}
}

11
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IProcessorPool.cs

@ -0,0 +1,11 @@
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
namespace Volo.Abp.AzureServiceBus
{
public interface IProcessorPool : IAsyncDisposable
{
Task<ServiceBusProcessor> GetAsync(string subscriptionName, string topicName, string connectionName);
}
}

11
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/IPublisherPool.cs

@ -0,0 +1,11 @@
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
namespace Volo.Abp.AzureServiceBus
{
public interface IPublisherPool : IAsyncDisposable
{
Task<ServiceBusSender> GetAsync(string topicName, string connectionName);
}
}

82
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ProcessorPool.cs

@ -0,0 +1,82 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.AzureServiceBus
{
public class ProcessorPool : IProcessorPool, ISingletonDependency
{
public ILogger<ProcessorPool> Logger { get; set; }
private bool _isDisposed;
private readonly AbpAzureServiceBusOptions _options;
private readonly IConnectionPool _connectionPool;
private readonly ConcurrentDictionary<string, Lazy<ServiceBusProcessor>> _processors;
public ProcessorPool(
IOptions<AbpAzureServiceBusOptions> options,
IConnectionPool connectionPool)
{
_options = options.Value;
_connectionPool = connectionPool;
_processors = new ConcurrentDictionary<string, Lazy<ServiceBusProcessor>>();
Logger = new NullLogger<ProcessorPool>();
}
public async Task<ServiceBusProcessor> GetAsync(string subscriptionName, string topicName, string connectionName)
{
var admin = _connectionPool.GetAdministrationClient(connectionName);
await admin.SetupSubscriptionAsync(topicName, subscriptionName);
return _processors.GetOrAdd(
$"{topicName}-{subscriptionName}", new Lazy<ServiceBusProcessor>(() =>
{
var config = _options.Connections.GetOrDefault(connectionName);
var client = _connectionPool.GetClient(connectionName);
return client.CreateProcessor(topicName, subscriptionName, config.Processor);
})
).Value;
}
public async ValueTask DisposeAsync()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
if (!_processors.Any())
{
Logger.LogDebug($"Disposed processor pool with no processors in the pool.");
return;
}
Logger.LogInformation($"Disposing processor pool ({_processors.Count} processors).");
foreach (var item in _processors.Values)
{
var processor = item.Value;
if (processor.IsProcessing)
{
await processor.StopProcessingAsync();
}
if (!processor.IsClosed)
{
await processor.CloseAsync();
}
await processor.DisposeAsync();
}
_processors.Clear();
}
}
}

66
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/PublisherPool.cs

@ -0,0 +1,66 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.AzureServiceBus
{
public class PublisherPool : IPublisherPool, ISingletonDependency
{
public ILogger<PublisherPool> Logger { get; set; }
private bool _isDisposed;
private readonly IConnectionPool _connectionPool;
private readonly ConcurrentDictionary<string, Lazy<ServiceBusSender>> _publishers;
public PublisherPool(IConnectionPool connectionPool)
{
_connectionPool = connectionPool;
_publishers = new ConcurrentDictionary<string, Lazy<ServiceBusSender>>();
Logger = new NullLogger<PublisherPool>();
}
public async Task<ServiceBusSender> GetAsync(string topicName, string connectionName)
{
var admin = _connectionPool.GetAdministrationClient(connectionName);
await admin.SetupTopicAsync(topicName);
return _publishers.GetOrAdd(
topicName, new Lazy<ServiceBusSender>(() =>
{
var client = _connectionPool.GetClient(connectionName);
return client.CreateSender(topicName);
})
).Value;
}
public async ValueTask DisposeAsync()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
if (!_publishers.Any())
{
Logger.LogDebug($"Disposed publisher pool with no publisher in the pool.");
return;
}
Logger.LogInformation($"Disposing publisher pool ({_publishers.Count} publishers).");
foreach (var publisher in _publishers.Values)
{
await publisher.Value.CloseAsync();
await publisher.Value.DisposeAsync();
}
_publishers.Clear();
}
}
}

25
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/ServiceBusAdministrationClientExtensions.cs

@ -0,0 +1,25 @@
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Administration;
namespace Volo.Abp.AzureServiceBus
{
public static class ServiceBusAdministrationClientExtensions
{
public static async Task SetupTopicAsync(this ServiceBusAdministrationClient client, string topicName)
{
if (!await client.TopicExistsAsync(topicName))
{
await client.CreateTopicAsync(topicName);
}
}
public static async Task SetupSubscriptionAsync(this ServiceBusAdministrationClient client, string topicName, string subscriptionName)
{
await client.SetupTopicAsync(topicName);
if (!await client.SubscriptionExistsAsync(topicName, subscriptionName))
{
await client.CreateSubscriptionAsync(topicName, subscriptionName);
}
}
}
}

27
framework/src/Volo.Abp.AzureServiceBus/Volo/Abp/AzureServiceBus/Utf8JsonAzureServiceBusSerializer.cs

@ -0,0 +1,27 @@
using System;
using System.Text;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Json;
namespace Volo.Abp.AzureServiceBus
{
public class Utf8JsonAzureServiceBusSerializer : IAzureServiceBusSerializer, ITransientDependency
{
private readonly IJsonSerializer _jsonSerializer;
public Utf8JsonAzureServiceBusSerializer(IJsonSerializer jsonSerializer)
{
_jsonSerializer = jsonSerializer;
}
public byte[] Serialize(object obj)
{
return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj));
}
public object Deserialize(BinaryData value, Type type)
{
return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value.ToArray()));
}
}
}
Loading…
Cancel
Save