Browse Source

Ensure that message queue processing completes regardless of whether the message was successfully processed

pull/689/head
cKey 3 years ago
parent
commit
71fa21f38c
  1. 2
      aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/ChatMessageEventHandler.cs
  2. 301
      aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/NotificationEventHandler.cs
  3. 1
      aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/LY.MicroService.RealtimeMessage.HttpApi.Host.csproj
  4. 11
      aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/MultiTenancy/TenantConfigurationCache.cs
  5. 23
      aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.Configure.cs
  6. 1
      aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.cs
  7. 7
      aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/appsettings.Development.json

2
aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/ChatMessageEventHandler.cs

@ -39,7 +39,7 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed
Logger = NullLogger<ChatMessageEventHandler>.Instance; Logger = NullLogger<ChatMessageEventHandler>.Instance;
} }
public virtual async Task HandleEventAsync(RealTimeEto<ChatMessage> eventData) public async virtual Task HandleEventAsync(RealTimeEto<ChatMessage> eventData)
{ {
Logger.LogDebug($"Persistent chat message."); Logger.LogDebug($"Persistent chat message.");

301
aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/NotificationEventHandler.cs

@ -137,13 +137,51 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed
} }
} }
[UnitOfWork]
public async virtual Task HandleEventAsync(NotificationEto<NotificationData> eventData)
{
var notification = await NotificationDefinitionManager.GetOrNullAsync(eventData.Name);
if (notification == null)
{
return;
}
if (notification.NotificationType == NotificationType.System)
{
await SendToTenantAsync(null, notification, eventData);
var allActiveTenants = await TenantConfigurationCache.GetTenantsAsync();
foreach (var activeTenant in allActiveTenants)
{
await SendToTenantAsync(activeTenant.Id, notification, eventData);
}
}
else
{
await SendToTenantAsync(eventData.TenantId, notification, eventData);
}
}
protected async virtual Task SendToTenantAsync( protected async virtual Task SendToTenantAsync(
Guid? tenantId, Guid? tenantId,
NotificationDefinition notification, NotificationDefinition notification,
NotificationEto<NotificationTemplate> eventData) NotificationEto<NotificationTemplate> eventData)
{ {
using (CurrentTenant.Change(tenantId)) using (CurrentTenant.Change(tenantId))
{ {
var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers);
// 过滤用户指定提供者
if (eventData.UseProviders.Any())
{
providers = providers.Where(p => eventData.UseProviders.Contains(p.Name));
}
else if (notification.Providers.Any())
{
providers = providers.Where(p => notification.Providers.Contains(p.Name));
}
var notificationInfo = new NotificationInfo var notificationInfo = new NotificationInfo
{ {
Name = notification.Name, Name = notification.Name,
@ -156,19 +194,29 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed
notificationInfo.SetId(eventData.Id); notificationInfo.SetId(eventData.Id);
var title = notification.DisplayName.Localize(StringLocalizerFactory); var title = notification.DisplayName.Localize(StringLocalizerFactory);
var message = "";
var message = await TemplateRenderer.RenderAsync( try
templateName: eventData.Data.Name, {
model: eventData.Data.ExtraProperties, // 由于模板通知受租户影响, 格式化失败的消息将被丢弃.
cultureName: eventData.Data.Culture, message = await TemplateRenderer.RenderAsync(
globalContext: new Dictionary<string, object> templateName: eventData.Data.Name,
{ model: eventData.Data.ExtraProperties,
{ "$notification", notification.Name }, cultureName: eventData.Data.Culture,
{ "$formUser", eventData.Data.FormUser }, globalContext: new Dictionary<string, object>
{ "$notificationId", eventData.Id }, {
{ "$title", title.ToString() }, { "$notification", notification.Name },
{ "$creationTime", eventData.CreationTime.ToString("yyyy-MM-dd HH:mm:ss") }, { "$formUser", eventData.Data.FormUser },
}); { "$notificationId", eventData.Id },
{ "$title", title.ToString() },
{ "$creationTime", eventData.CreationTime.ToString("yyyy-MM-dd HH:mm:ss") },
});
}
catch(Exception ex)
{
Logger.LogWarning("Formatting template notification failed, message will be discarded, cause :{message}", ex.Message);
return;
}
var notificationData = new NotificationData(); var notificationData = new NotificationData();
notificationData.WriteStandardData( notificationData.WriteStandardData(
@ -180,51 +228,25 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed
notificationInfo.Data = notificationData; notificationInfo.Data = notificationData;
Logger.LogDebug($"Persistent notification {notificationInfo.Name}"); var subscriptionUsers = await GerSubscriptionUsersAsync(
notificationInfo.Name,
eventData.Users,
tenantId);
// 持久化通知 await PersistentNotificationAsync(
await NotificationStore.InsertNotificationAsync(notificationInfo); notificationInfo,
subscriptionUsers,
providers);
var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers); if (subscriptionUsers.Any())
// 过滤用户指定提供者
if (eventData.UseProviders.Any())
{ {
providers = providers.Where(p => eventData.UseProviders.Contains(p.Name)); // 发布通知
} foreach (var provider in providers)
else if (notification.Providers.Any()) {
{ await PublishToSubscriberAsync(provider, notificationInfo, subscriptionUsers);
providers = providers.Where(p => notification.Providers.Contains(p.Name)); }
}
await PublishFromProvidersAsync(providers, eventData.Users, notificationInfo);
}
}
[UnitOfWork]
public async virtual Task HandleEventAsync(NotificationEto<NotificationData> eventData)
{
var notification = await NotificationDefinitionManager.GetOrNullAsync(eventData.Name);
if (notification == null)
{
return;
}
if (notification.NotificationType == NotificationType.System)
{
await SendToTenantAsync(null, notification, eventData);
var allActiveTenants = await TenantConfigurationCache.GetTenantsAsync();
foreach (var activeTenant in allActiveTenants)
{
await SendToTenantAsync(activeTenant.Id, notification, eventData);
} }
} }
else
{
await SendToTenantAsync(eventData.TenantId, notification, eventData);
}
} }
protected async virtual Task SendToTenantAsync( protected async virtual Task SendToTenantAsync(
@ -234,6 +256,18 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed
{ {
using (CurrentTenant.Change(tenantId)) using (CurrentTenant.Change(tenantId))
{ {
var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers);
// 过滤用户指定提供者
if (eventData.UseProviders.Any())
{
providers = providers.Where(p => eventData.UseProviders.Contains(p.Name));
}
else if (notification.Providers.Any())
{
providers = providers.Where(p => notification.Providers.Contains(p.Name));
}
var notificationInfo = new NotificationInfo var notificationInfo = new NotificationInfo
{ {
Name = notification.Name, Name = notification.Name,
@ -249,86 +283,113 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed
// TODO: 可以做成一个接口来序列化消息 // TODO: 可以做成一个接口来序列化消息
notificationInfo.Data = NotificationDataConverter.Convert(notificationInfo.Data); notificationInfo.Data = NotificationDataConverter.Convert(notificationInfo.Data);
Logger.LogDebug($"Persistent notification {notificationInfo.Name}"); // 获取用户订阅
var subscriptionUsers = await GerSubscriptionUsersAsync(
notificationInfo.Name,
eventData.Users,
tenantId);
// 持久化通知 // 持久化通知
await NotificationStore.InsertNotificationAsync(notificationInfo); await PersistentNotificationAsync(
notificationInfo,
var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers); subscriptionUsers,
providers);
// 过滤用户指定提供者 if (subscriptionUsers.Any())
if (eventData.UseProviders.Any())
{
providers = providers.Where(p => eventData.UseProviders.Contains(p.Name));
}
else if (notification.Providers.Any())
{ {
providers = providers.Where(p => notification.Providers.Contains(p.Name)); // 发布订阅通知
foreach (var provider in providers)
{
await PublishToSubscriberAsync(provider, notificationInfo, subscriptionUsers);
}
} }
await PublishFromProvidersAsync(providers, eventData.Users, notificationInfo);
} }
} }
/// <summary> /// <summary>
/// 指定提供者发布通知 /// 获取用户订阅列表
/// </summary> /// </summary>
/// <param name="providers">提供者列表</param> /// <param name="notificationName">通知名称</param>
/// <param name="notificationInfo">通知信息</param> /// <param name="sendToUsers">接收用户列表</param>
/// <returns></returns> /// <param name="tenantId">租户标识</param>
protected async Task PublishFromProvidersAsync( /// <returns>用户订阅列表</returns>
IEnumerable<INotificationPublishProvider> providers, protected async Task<IEnumerable<UserIdentifier>> GerSubscriptionUsersAsync(
IEnumerable<UserIdentifier> users, string notificationName,
NotificationInfo notificationInfo) IEnumerable<UserIdentifier> sendToUsers,
Guid? tenantId = null)
{ {
// 检查是够已订阅消息 try
Logger.LogDebug($"Gets a list of user subscriptions {notificationInfo.Name}"); {
// 获取用户订阅列表
// 获取用户订阅列表 var userSubscriptions = await NotificationSubscriptionManager.GetUsersSubscriptionsAsync(
var userSubscriptions = await NotificationSubscriptionManager tenantId,
.GetUsersSubscriptionsAsync(notificationInfo.TenantId, notificationInfo.Name, users); notificationName,
sendToUsers);
users = userSubscriptions.Select(us => new UserIdentifier(us.UserId, us.UserName)); return userSubscriptions.Select(us => new UserIdentifier(us.UserId, us.UserName));
}
catch(Exception ex)
{
Logger.LogWarning("Failed to get user subscription, message will not be received by the user, reason: {message}", ex.Message);
}
if (users.Any()) return new List<UserIdentifier>();
}
/// <summary>
/// 持久化通知并返回订阅用户列表
/// </summary>
/// <param name="notificationInfo">通知实体</param>
/// <param name="subscriptionUsers">订阅用户列表</param>
/// <param name="sendToProviders">通知发送提供者</param>
/// <returns>返回订阅者列表</returns>
protected async Task PersistentNotificationAsync(
NotificationInfo notificationInfo,
IEnumerable<UserIdentifier> subscriptionUsers,
IEnumerable<INotificationPublishProvider> sendToProviders)
{
try
{ {
// 持久化通知
await NotificationStore.InsertNotificationAsync(notificationInfo);
if (!subscriptionUsers.Any())
{
return;
}
// 持久化用户通知 // 持久化用户通知
Logger.LogDebug($"Persistent user notifications {notificationInfo.Name}"); await NotificationStore.InsertUserNotificationsAsync(notificationInfo, subscriptionUsers.Select(u => u.UserId));
await NotificationStore
.InsertUserNotificationsAsync(
notificationInfo,
users.Select(u => u.UserId));
// 2020-11-02 fix bug, 多个发送提供者处于同一个工作单元之下,不能把删除用户订阅写入到单个通知提供者完成事件中
// 而且为了确保一致性,删除订阅移动到发布通知之前
if (notificationInfo.Lifetime == NotificationLifetime.OnlyOne) if (notificationInfo.Lifetime == NotificationLifetime.OnlyOne)
{ {
// 一次性通知在发送完成后就取消用户订阅 // 一次性通知取消用户订阅
await NotificationStore await NotificationStore.DeleteUserSubscriptionAsync(
.DeleteUserSubscriptionAsync( notificationInfo.TenantId,
notificationInfo.TenantId, subscriptionUsers,
users, notificationInfo.Name);
notificationInfo.Name);
} }
}
catch (Exception ex)
{
Logger.LogWarning("Failed to persistent notification failed, reason: {message}", ex.Message);
// 发布通知 foreach (var provider in sendToProviders)
foreach (var provider in providers)
{ {
await PublishAsync(provider, notificationInfo, users); // 处理持久化失败进入后台队列
await ProcessingFailedToQueueAsync(provider, notificationInfo, subscriptionUsers);
} }
} }
} }
/// <summary> /// <summary>
/// 发布通知 /// 发布订阅者通知
/// </summary> /// </summary>
/// <param name="provider">通知发布者</param> /// <param name="provider">通知发布者</param>
/// <param name="notificationInfo">通知信息</param> /// <param name="notificationInfo">通知信息</param>
/// <param name="subscriptionUserIdentifiers">订阅用户列表</param> /// <param name="subscriptionUserIdentifiers">订阅用户列表</param>
/// <returns></returns> /// <returns></returns>
protected async Task PublishAsync( protected async Task PublishToSubscriberAsync(
INotificationPublishProvider provider, INotificationPublishProvider provider,
NotificationInfo notificationInfo, NotificationInfo notificationInfo,
IEnumerable<UserIdentifier> subscriptionUserIdentifiers) IEnumerable<UserIdentifier> subscriptionUsers)
{ {
try try
{ {
@ -340,7 +401,7 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed
notificationInfo.Data = notifacationDataMapping.MappingFunc(notificationInfo.Data); notificationInfo.Data = notifacationDataMapping.MappingFunc(notificationInfo.Data);
} }
// 发布 // 发布
await provider.PublishAsync(notificationInfo, subscriptionUserIdentifiers); await provider.PublishAsync(notificationInfo, subscriptionUsers);
Logger.LogDebug($"Send notification {notificationInfo.Name} with provider {provider.Name} was successful"); Logger.LogDebug($"Send notification {notificationInfo.Name} with provider {provider.Name} was successful");
} }
@ -348,18 +409,40 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed
{ {
Logger.LogWarning($"Send notification error with provider {provider.Name}"); Logger.LogWarning($"Send notification error with provider {provider.Name}");
Logger.LogWarning($"Error message:{ex.Message}"); Logger.LogWarning($"Error message:{ex.Message}");
Logger.LogDebug($"Failed to send notification {notificationInfo.Name}. Try to push notification to background job");
Logger.LogTrace(ex, $"Send notification error with provider { provider.Name}"); // 发送失败的消息进入后台队列
await ProcessingFailedToQueueAsync(provider, notificationInfo, subscriptionUsers);
Logger.LogDebug($"Send notification error, notification {notificationInfo.Name} entry queue"); }
}
/// <summary>
/// 处理失败的消息进入后台队列
/// </summary>
/// <remarks>
/// 注: 如果入队失败,消息将被丢弃.
/// </remarks>
/// <param name="provider"></param>
/// <param name="notificationInfo"></param>
/// <param name="subscriptionUsers"></param>
/// <returns></returns>
protected async Task ProcessingFailedToQueueAsync(
INotificationPublishProvider provider,
NotificationInfo notificationInfo,
IEnumerable<UserIdentifier> subscriptionUsers)
{
try
{
// 发送失败的消息进入后台队列 // 发送失败的消息进入后台队列
await BackgroundJobManager.EnqueueAsync( await BackgroundJobManager.EnqueueAsync(
new NotificationPublishJobArgs( new NotificationPublishJobArgs(
notificationInfo.GetId(), notificationInfo.GetId(),
provider.GetType().AssemblyQualifiedName, provider.GetType().AssemblyQualifiedName,
subscriptionUserIdentifiers.ToList(), subscriptionUsers.ToList(),
notificationInfo.TenantId)); notificationInfo.TenantId));
} }
catch(Exception ex)
{
Logger.LogWarning("Failed to push to background job, notification will be discarded, error cause: {message}", ex.Message);
}
} }
} }
} }

1
aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/LY.MicroService.RealtimeMessage.HttpApi.Host.csproj

@ -19,6 +19,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference> </PackageReference>
<PackageReference Include="Microsoft.AspNetCore.DataProtection.StackExchangeRedis" Version="$(MicrosoftPackageVersion)" /> <PackageReference Include="Microsoft.AspNetCore.DataProtection.StackExchangeRedis" Version="$(MicrosoftPackageVersion)" />
<PackageReference Include="Microsoft.AspNetCore.SignalR.StackExchangeRedis" Version="$(MicrosoftPackageVersion)" />
<PackageReference Include="Quartz.Serialization.Json" Version="$(QuartzNETPackageVersion)" /> <PackageReference Include="Quartz.Serialization.Json" Version="$(QuartzNETPackageVersion)" />
<PackageReference Include="Serilog.AspNetCore" Version="$(SerilogAspNetCorePackageVersion)" /> <PackageReference Include="Serilog.AspNetCore" Version="$(SerilogAspNetCorePackageVersion)" />
<PackageReference Include="Serilog.Enrichers.Environment" Version="$(SerilogEnrichersEnvironmentPackageVersion)" /> <PackageReference Include="Serilog.Enrichers.Environment" Version="$(SerilogEnrichersEnvironmentPackageVersion)" />

11
aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/MultiTenancy/TenantConfigurationCache.cs

@ -35,11 +35,12 @@ public class TenantConfigurationCache : ITenantConfigurationCache, ITransientDep
var allActiveTenants = await TenantRepository.GetListAsync(); var allActiveTenants = await TenantRepository.GetListAsync();
cacheItem = new TenantConfigurationCacheItem( cacheItem = new TenantConfigurationCacheItem(
allActiveTenants.Select(t => allActiveTenants
new TenantConfiguration(t.Id, t.Name) .Where(t => t.IsActive)
{ .Select(t => new TenantConfiguration(t.Id, t.Name)
IsActive = t.IsActive, {
}).ToList()); IsActive = t.IsActive,
}).ToList());
await TenantCache.SetAsync(cacheKey, cacheItem); await TenantCache.SetAsync(cacheKey, cacheItem);
} }

23
aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.Configure.cs

@ -9,6 +9,7 @@ using LY.MicroService.RealtimeMessage.BackgroundJobs;
using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Cors; using Microsoft.AspNetCore.Cors;
using Microsoft.AspNetCore.DataProtection; using Microsoft.AspNetCore.DataProtection;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Caching.StackExchangeRedis; using Microsoft.Extensions.Caching.StackExchangeRedis;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@ -192,6 +193,28 @@ public partial class RealtimeMessageHttpApiHostModule
}); });
} }
private void PreConfigureSignalR(IConfiguration configuration)
{
PreConfigure<ISignalRServerBuilder>(builder =>
{
var redisEnabled = configuration["SignalR:Redis:IsEnabled"];
if (redisEnabled.IsNullOrEmpty() || bool.Parse(redisEnabled))
{
builder.AddStackExchangeRedis(redis =>
{
var redisConfiguration = configuration["SignalR:Redis:Configuration"];
if (!redisConfiguration.IsNullOrEmpty())
{
redis.ConnectionFactory = async (writer) =>
{
return await ConnectionMultiplexer.ConnectAsync(redisConfiguration);
};
}
});
}
});
}
private void ConfigureVirtualFileSystem() private void ConfigureVirtualFileSystem()
{ {
Configure<AbpVirtualFileSystemOptions>(options => Configure<AbpVirtualFileSystemOptions>(options =>

1
aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.cs

@ -105,6 +105,7 @@ public partial class RealtimeMessageHttpApiHostModule : AbpModule
PreConfigureFeature(); PreConfigureFeature();
PreConfigureCAP(configuration); PreConfigureCAP(configuration);
PreConfigureQuartz(configuration); PreConfigureQuartz(configuration);
PreConfigureSignalR(configuration);
} }
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)

7
aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/appsettings.Development.json

@ -34,9 +34,16 @@
"AbpTextTemplating": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456" "AbpTextTemplating": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456"
}, },
"Redis": { "Redis": {
"IsEnabled": true,
"Configuration": "127.0.0.1,defaultDatabase=8", "Configuration": "127.0.0.1,defaultDatabase=8",
"InstanceName": "LINGYUN.Abp.Application" "InstanceName": "LINGYUN.Abp.Application"
}, },
"SignalR": {
"Redis": {
"IsEnabled": true,
"Configuration": "127.0.0.1,defaultDatabase=13,channelPrefix=abp-realtime-channel"
}
},
"AuthServer": { "AuthServer": {
"Authority": "http://127.0.0.1:44385/", "Authority": "http://127.0.0.1:44385/",
"ApiName": "lingyun-abp-application" "ApiName": "lingyun-abp-application"

Loading…
Cancel
Save