From 71fa21f38c717b94a38f32ab6306861e6dedd354 Mon Sep 17 00:00:00 2001 From: cKey <35512826+colinin@users.noreply.github.com> Date: Tue, 6 Sep 2022 15:36:33 +0800 Subject: [PATCH] Ensure that message queue processing completes regardless of whether the message was successfully processed --- .../Distributed/ChatMessageEventHandler.cs | 2 +- .../Distributed/NotificationEventHandler.cs | 301 +++++++++++------- ...ervice.RealtimeMessage.HttpApi.Host.csproj | 1 + .../MultiTenancy/TenantConfigurationCache.cs | 11 +- ...ltimeMessageHttpApiHostModule.Configure.cs | 23 ++ .../RealtimeMessageHttpApiHostModule.cs | 1 + .../appsettings.Development.json | 7 + 7 files changed, 231 insertions(+), 115 deletions(-) diff --git a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/ChatMessageEventHandler.cs b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/ChatMessageEventHandler.cs index e9262e9d1..bffbe9242 100644 --- a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/ChatMessageEventHandler.cs +++ b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/ChatMessageEventHandler.cs @@ -39,7 +39,7 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed Logger = NullLogger.Instance; } - public virtual async Task HandleEventAsync(RealTimeEto eventData) + public async virtual Task HandleEventAsync(RealTimeEto eventData) { Logger.LogDebug($"Persistent chat message."); diff --git a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/NotificationEventHandler.cs b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/NotificationEventHandler.cs index f0d6e5479..bb03a15c9 100644 --- a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/NotificationEventHandler.cs +++ b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/EventBus/Distributed/NotificationEventHandler.cs @@ -137,13 +137,51 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed } } + [UnitOfWork] + public async virtual Task HandleEventAsync(NotificationEto eventData) + { + var notification = await NotificationDefinitionManager.GetOrNullAsync(eventData.Name); + if (notification == null) + { + return; + } + + if (notification.NotificationType == NotificationType.System) + { + await SendToTenantAsync(null, notification, eventData); + + var allActiveTenants = await TenantConfigurationCache.GetTenantsAsync(); + + foreach (var activeTenant in allActiveTenants) + { + await SendToTenantAsync(activeTenant.Id, notification, eventData); + } + } + else + { + await SendToTenantAsync(eventData.TenantId, notification, eventData); + } + } + protected async virtual Task SendToTenantAsync( - Guid? tenantId, + Guid? tenantId, NotificationDefinition notification, NotificationEto eventData) { using (CurrentTenant.Change(tenantId)) { + var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers); + + // 过滤用户指定提供者 + if (eventData.UseProviders.Any()) + { + providers = providers.Where(p => eventData.UseProviders.Contains(p.Name)); + } + else if (notification.Providers.Any()) + { + providers = providers.Where(p => notification.Providers.Contains(p.Name)); + } + var notificationInfo = new NotificationInfo { Name = notification.Name, @@ -156,19 +194,29 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed notificationInfo.SetId(eventData.Id); var title = notification.DisplayName.Localize(StringLocalizerFactory); + var message = ""; - var message = await TemplateRenderer.RenderAsync( - templateName: eventData.Data.Name, - model: eventData.Data.ExtraProperties, - cultureName: eventData.Data.Culture, - globalContext: new Dictionary - { - { "$notification", notification.Name }, - { "$formUser", eventData.Data.FormUser }, - { "$notificationId", eventData.Id }, - { "$title", title.ToString() }, - { "$creationTime", eventData.CreationTime.ToString("yyyy-MM-dd HH:mm:ss") }, - }); + try + { + // 由于模板通知受租户影响, 格式化失败的消息将被丢弃. + message = await TemplateRenderer.RenderAsync( + templateName: eventData.Data.Name, + model: eventData.Data.ExtraProperties, + cultureName: eventData.Data.Culture, + globalContext: new Dictionary + { + { "$notification", notification.Name }, + { "$formUser", eventData.Data.FormUser }, + { "$notificationId", eventData.Id }, + { "$title", title.ToString() }, + { "$creationTime", eventData.CreationTime.ToString("yyyy-MM-dd HH:mm:ss") }, + }); + } + catch(Exception ex) + { + Logger.LogWarning("Formatting template notification failed, message will be discarded, cause :{message}", ex.Message); + return; + } var notificationData = new NotificationData(); notificationData.WriteStandardData( @@ -180,51 +228,25 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed notificationInfo.Data = notificationData; - Logger.LogDebug($"Persistent notification {notificationInfo.Name}"); + var subscriptionUsers = await GerSubscriptionUsersAsync( + notificationInfo.Name, + eventData.Users, + tenantId); - // 持久化通知 - await NotificationStore.InsertNotificationAsync(notificationInfo); + await PersistentNotificationAsync( + notificationInfo, + subscriptionUsers, + providers); - var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers); - - // 过滤用户指定提供者 - if (eventData.UseProviders.Any()) + if (subscriptionUsers.Any()) { - providers = providers.Where(p => eventData.UseProviders.Contains(p.Name)); - } - else if (notification.Providers.Any()) - { - providers = providers.Where(p => notification.Providers.Contains(p.Name)); - } - - await PublishFromProvidersAsync(providers, eventData.Users, notificationInfo); - } - } - - [UnitOfWork] - public async virtual Task HandleEventAsync(NotificationEto eventData) - { - var notification = await NotificationDefinitionManager.GetOrNullAsync(eventData.Name); - if (notification == null) - { - return; - } - - if (notification.NotificationType == NotificationType.System) - { - await SendToTenantAsync(null, notification, eventData); - - var allActiveTenants = await TenantConfigurationCache.GetTenantsAsync(); - - foreach (var activeTenant in allActiveTenants) - { - await SendToTenantAsync(activeTenant.Id, notification, eventData); + // 发布通知 + foreach (var provider in providers) + { + await PublishToSubscriberAsync(provider, notificationInfo, subscriptionUsers); + } } } - else - { - await SendToTenantAsync(eventData.TenantId, notification, eventData); - } } protected async virtual Task SendToTenantAsync( @@ -234,6 +256,18 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed { using (CurrentTenant.Change(tenantId)) { + var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers); + + // 过滤用户指定提供者 + if (eventData.UseProviders.Any()) + { + providers = providers.Where(p => eventData.UseProviders.Contains(p.Name)); + } + else if (notification.Providers.Any()) + { + providers = providers.Where(p => notification.Providers.Contains(p.Name)); + } + var notificationInfo = new NotificationInfo { Name = notification.Name, @@ -249,86 +283,113 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed // TODO: 可以做成一个接口来序列化消息 notificationInfo.Data = NotificationDataConverter.Convert(notificationInfo.Data); - Logger.LogDebug($"Persistent notification {notificationInfo.Name}"); + // 获取用户订阅 + var subscriptionUsers = await GerSubscriptionUsersAsync( + notificationInfo.Name, + eventData.Users, + tenantId); // 持久化通知 - await NotificationStore.InsertNotificationAsync(notificationInfo); - - var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers); + await PersistentNotificationAsync( + notificationInfo, + subscriptionUsers, + providers); - // 过滤用户指定提供者 - if (eventData.UseProviders.Any()) - { - providers = providers.Where(p => eventData.UseProviders.Contains(p.Name)); - } - else if (notification.Providers.Any()) + if (subscriptionUsers.Any()) { - providers = providers.Where(p => notification.Providers.Contains(p.Name)); + // 发布订阅通知 + foreach (var provider in providers) + { + await PublishToSubscriberAsync(provider, notificationInfo, subscriptionUsers); + } } - - await PublishFromProvidersAsync(providers, eventData.Users, notificationInfo); } } - /// - /// 指定提供者发布通知 + /// 获取用户订阅列表 /// - /// 提供者列表 - /// 通知信息 - /// - protected async Task PublishFromProvidersAsync( - IEnumerable providers, - IEnumerable users, - NotificationInfo notificationInfo) + /// 通知名称 + /// 接收用户列表 + /// 租户标识 + /// 用户订阅列表 + protected async Task> GerSubscriptionUsersAsync( + string notificationName, + IEnumerable sendToUsers, + Guid? tenantId = null) { - // 检查是够已订阅消息 - Logger.LogDebug($"Gets a list of user subscriptions {notificationInfo.Name}"); - - // 获取用户订阅列表 - var userSubscriptions = await NotificationSubscriptionManager - .GetUsersSubscriptionsAsync(notificationInfo.TenantId, notificationInfo.Name, users); + try + { + // 获取用户订阅列表 + var userSubscriptions = await NotificationSubscriptionManager.GetUsersSubscriptionsAsync( + tenantId, + notificationName, + sendToUsers); - users = userSubscriptions.Select(us => new UserIdentifier(us.UserId, us.UserName)); + return userSubscriptions.Select(us => new UserIdentifier(us.UserId, us.UserName)); + } + catch(Exception ex) + { + Logger.LogWarning("Failed to get user subscription, message will not be received by the user, reason: {message}", ex.Message); + } - if (users.Any()) + return new List(); + } + /// + /// 持久化通知并返回订阅用户列表 + /// + /// 通知实体 + /// 订阅用户列表 + /// 通知发送提供者 + /// 返回订阅者列表 + protected async Task PersistentNotificationAsync( + NotificationInfo notificationInfo, + IEnumerable subscriptionUsers, + IEnumerable sendToProviders) + { + try { + // 持久化通知 + await NotificationStore.InsertNotificationAsync(notificationInfo); + + if (!subscriptionUsers.Any()) + { + return; + } + // 持久化用户通知 - Logger.LogDebug($"Persistent user notifications {notificationInfo.Name}"); - await NotificationStore - .InsertUserNotificationsAsync( - notificationInfo, - users.Select(u => u.UserId)); - - // 2020-11-02 fix bug, 多个发送提供者处于同一个工作单元之下,不能把删除用户订阅写入到单个通知提供者完成事件中 - // 而且为了确保一致性,删除订阅移动到发布通知之前 + await NotificationStore.InsertUserNotificationsAsync(notificationInfo, subscriptionUsers.Select(u => u.UserId)); + if (notificationInfo.Lifetime == NotificationLifetime.OnlyOne) { - // 一次性通知在发送完成后就取消用户订阅 - await NotificationStore - .DeleteUserSubscriptionAsync( - notificationInfo.TenantId, - users, - notificationInfo.Name); + // 一次性通知取消用户订阅 + await NotificationStore.DeleteUserSubscriptionAsync( + notificationInfo.TenantId, + subscriptionUsers, + notificationInfo.Name); } + } + catch (Exception ex) + { + Logger.LogWarning("Failed to persistent notification failed, reason: {message}", ex.Message); - // 发布通知 - foreach (var provider in providers) + foreach (var provider in sendToProviders) { - await PublishAsync(provider, notificationInfo, users); + // 处理持久化失败进入后台队列 + await ProcessingFailedToQueueAsync(provider, notificationInfo, subscriptionUsers); } } } /// - /// 发布通知 + /// 发布订阅者通知 /// /// 通知发布者 /// 通知信息 /// 订阅用户列表 /// - protected async Task PublishAsync( + protected async Task PublishToSubscriberAsync( INotificationPublishProvider provider, NotificationInfo notificationInfo, - IEnumerable subscriptionUserIdentifiers) + IEnumerable subscriptionUsers) { try { @@ -340,7 +401,7 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed notificationInfo.Data = notifacationDataMapping.MappingFunc(notificationInfo.Data); } // 发布 - await provider.PublishAsync(notificationInfo, subscriptionUserIdentifiers); + await provider.PublishAsync(notificationInfo, subscriptionUsers); Logger.LogDebug($"Send notification {notificationInfo.Name} with provider {provider.Name} was successful"); } @@ -348,18 +409,40 @@ namespace LY.MicroService.RealtimeMessage.EventBus.Distributed { Logger.LogWarning($"Send notification error with provider {provider.Name}"); Logger.LogWarning($"Error message:{ex.Message}"); - - Logger.LogTrace(ex, $"Send notification error with provider { provider.Name}"); - - Logger.LogDebug($"Send notification error, notification {notificationInfo.Name} entry queue"); + Logger.LogDebug($"Failed to send notification {notificationInfo.Name}. Try to push notification to background job"); + // 发送失败的消息进入后台队列 + await ProcessingFailedToQueueAsync(provider, notificationInfo, subscriptionUsers); + } + } + /// + /// 处理失败的消息进入后台队列 + /// + /// + /// 注: 如果入队失败,消息将被丢弃. + /// + /// + /// + /// + /// + protected async Task ProcessingFailedToQueueAsync( + INotificationPublishProvider provider, + NotificationInfo notificationInfo, + IEnumerable subscriptionUsers) + { + try + { // 发送失败的消息进入后台队列 await BackgroundJobManager.EnqueueAsync( new NotificationPublishJobArgs( notificationInfo.GetId(), provider.GetType().AssemblyQualifiedName, - subscriptionUserIdentifiers.ToList(), + subscriptionUsers.ToList(), notificationInfo.TenantId)); } + catch(Exception ex) + { + Logger.LogWarning("Failed to push to background job, notification will be discarded, error cause: {message}", ex.Message); + } } } } diff --git a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/LY.MicroService.RealtimeMessage.HttpApi.Host.csproj b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/LY.MicroService.RealtimeMessage.HttpApi.Host.csproj index f8acb5acf..91c10d8a5 100644 --- a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/LY.MicroService.RealtimeMessage.HttpApi.Host.csproj +++ b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/LY.MicroService.RealtimeMessage.HttpApi.Host.csproj @@ -19,6 +19,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/MultiTenancy/TenantConfigurationCache.cs b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/MultiTenancy/TenantConfigurationCache.cs index d6f3dbb55..9f0450a6b 100644 --- a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/MultiTenancy/TenantConfigurationCache.cs +++ b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/MultiTenancy/TenantConfigurationCache.cs @@ -35,11 +35,12 @@ public class TenantConfigurationCache : ITenantConfigurationCache, ITransientDep var allActiveTenants = await TenantRepository.GetListAsync(); cacheItem = new TenantConfigurationCacheItem( - allActiveTenants.Select(t => - new TenantConfiguration(t.Id, t.Name) - { - IsActive = t.IsActive, - }).ToList()); + allActiveTenants + .Where(t => t.IsActive) + .Select(t => new TenantConfiguration(t.Id, t.Name) + { + IsActive = t.IsActive, + }).ToList()); await TenantCache.SetAsync(cacheKey, cacheItem); } diff --git a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.Configure.cs b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.Configure.cs index 0f8659f28..704ebdc52 100644 --- a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.Configure.cs +++ b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.Configure.cs @@ -9,6 +9,7 @@ using LY.MicroService.RealtimeMessage.BackgroundJobs; using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Cors; using Microsoft.AspNetCore.DataProtection; +using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Caching.StackExchangeRedis; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -192,6 +193,28 @@ public partial class RealtimeMessageHttpApiHostModule }); } + private void PreConfigureSignalR(IConfiguration configuration) + { + PreConfigure(builder => + { + var redisEnabled = configuration["SignalR:Redis:IsEnabled"]; + if (redisEnabled.IsNullOrEmpty() || bool.Parse(redisEnabled)) + { + builder.AddStackExchangeRedis(redis => + { + var redisConfiguration = configuration["SignalR:Redis:Configuration"]; + if (!redisConfiguration.IsNullOrEmpty()) + { + redis.ConnectionFactory = async (writer) => + { + return await ConnectionMultiplexer.ConnectAsync(redisConfiguration); + }; + } + }); + } + }); + } + private void ConfigureVirtualFileSystem() { Configure(options => diff --git a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.cs b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.cs index 19d593f0d..78ef7910c 100644 --- a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.cs +++ b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/RealtimeMessageHttpApiHostModule.cs @@ -105,6 +105,7 @@ public partial class RealtimeMessageHttpApiHostModule : AbpModule PreConfigureFeature(); PreConfigureCAP(configuration); PreConfigureQuartz(configuration); + PreConfigureSignalR(configuration); } public override void ConfigureServices(ServiceConfigurationContext context) diff --git a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/appsettings.Development.json b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/appsettings.Development.json index 5786da2c9..30553c5dd 100644 --- a/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/appsettings.Development.json +++ b/aspnet-core/services/LY.MicroService.RealtimeMessage.HttpApi.Host/appsettings.Development.json @@ -34,9 +34,16 @@ "AbpTextTemplating": "Server=127.0.0.1;Database=Platform;User Id=root;Password=123456" }, "Redis": { + "IsEnabled": true, "Configuration": "127.0.0.1,defaultDatabase=8", "InstanceName": "LINGYUN.Abp.Application" }, + "SignalR": { + "Redis": { + "IsEnabled": true, + "Configuration": "127.0.0.1,defaultDatabase=13,channelPrefix=abp-realtime-channel" + } + }, "AuthServer": { "Authority": "http://127.0.0.1:44385/", "ApiName": "lingyun-abp-application"