using LINGYUN.Abp.Notifications; using LINGYUN.Abp.Notifications.Templating; using LY.MicroService.RealtimeMessage.BackgroundJobs; using LY.MicroService.RealtimeMessage.MultiTenancy; using Microsoft.Extensions.Localization; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Globalization; using System.Linq; using System.Threading.Tasks; using Volo.Abp.BackgroundJobs; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Json; using Volo.Abp.Localization; using Volo.Abp.MultiTenancy; using Volo.Abp.Settings; using Volo.Abp.TextTemplating; using Volo.Abp.Timing; using Volo.Abp.Uow; namespace LY.MicroService.RealtimeMessage.EventBus.Distributed { /// /// 订阅通知发布事件,统一发布消息 /// /// /// 作用在于SignalR客户端只会与一台服务器建立连接, /// 只有启用了SignlR服务端的才能真正将消息发布到客户端 /// public class NotificationEventHandler : IDistributedEventHandler>, IDistributedEventHandler>, ITransientDependency { /// /// Reference to . /// public ILogger Logger { get; set; } /// /// Reference to . /// protected AbpNotificationsPublishOptions Options { get; } /// /// Reference to . /// protected IClock Clock { get; } /// /// Reference to . /// protected ISettingProvider SettingProvider { get; } /// /// Reference to . /// protected ICurrentTenant CurrentTenant { get; } /// /// Reference to . /// protected ITenantConfigurationCache TenantConfigurationCache { get; } /// /// Reference to . /// protected IJsonSerializer JsonSerializer { get; } /// /// Reference to . /// protected IBackgroundJobManager BackgroundJobManager { get; } /// /// Reference to . /// protected ITemplateRenderer TemplateRenderer { get; } /// /// Reference to . /// protected INotificationStore NotificationStore { get; } /// /// Reference to . /// protected IStringLocalizerFactory StringLocalizerFactory { get; } /// /// Reference to . /// protected INotificationDataSerializer NotificationDataSerializer { get; } /// /// Reference to . /// protected INotificationTemplateResolver NotificationTemplateResolver { get; } /// /// Reference to . /// protected INotificationDefinitionManager NotificationDefinitionManager { get; } /// /// Reference to . /// protected INotificationSubscriptionManager NotificationSubscriptionManager { get; } /// /// Reference to . /// protected INotificationPublishProviderManager NotificationPublishProviderManager { get; } /// /// Initializes a new instance of the class. /// public NotificationEventHandler( IClock clock, ICurrentTenant currentTenant, ISettingProvider settingProvider, ITenantConfigurationCache tenantConfigurationCache, IJsonSerializer jsonSerializer, ITemplateRenderer templateRenderer, IBackgroundJobManager backgroundJobManager, IStringLocalizerFactory stringLocalizerFactory, IOptions options, INotificationStore notificationStore, INotificationDataSerializer notificationDataSerializer, INotificationTemplateResolver notificationTemplateResolver, INotificationDefinitionManager notificationDefinitionManager, INotificationSubscriptionManager notificationSubscriptionManager, INotificationPublishProviderManager notificationPublishProviderManager) { Clock = clock; Options = options.Value; TenantConfigurationCache = tenantConfigurationCache; CurrentTenant = currentTenant; SettingProvider = settingProvider; JsonSerializer = jsonSerializer; TemplateRenderer = templateRenderer; BackgroundJobManager = backgroundJobManager; StringLocalizerFactory = stringLocalizerFactory; NotificationStore = notificationStore; NotificationDataSerializer = notificationDataSerializer; NotificationTemplateResolver = notificationTemplateResolver; NotificationDefinitionManager = notificationDefinitionManager; NotificationSubscriptionManager = notificationSubscriptionManager; NotificationPublishProviderManager = notificationPublishProviderManager; Logger = NullLogger.Instance; } [UnitOfWork] public async virtual Task HandleEventAsync(NotificationEto eventData) { var notification = await NotificationDefinitionManager.GetOrNullAsync(eventData.Name); if (notification == null) { return; } var culture = eventData.Data.Culture; if (culture.IsNullOrWhiteSpace()) { var cultureSet = await SettingProvider.GetOrNullAsync(LocalizationSettingNames.DefaultLanguage); culture = cultureSet ?? CultureInfo.CurrentCulture.Name; } using (CultureHelper.Use(culture, culture)) { var result = await NotificationTemplateResolver.ResolveAsync(eventData.Data); if (notification.NotificationType == NotificationType.System) { using (CurrentTenant.Change(null)) { await SendToTenantAsync(null, notification, eventData, result); var allActiveTenants = await TenantConfigurationCache.GetTenantsAsync(); foreach (var activeTenant in allActiveTenants) { await SendToTenantAsync(activeTenant.Id, notification, eventData, result); } } } else { await SendToTenantAsync(eventData.TenantId, notification, eventData, result); } } } [UnitOfWork] public async virtual Task HandleEventAsync(NotificationEto eventData) { var notification = await NotificationDefinitionManager.GetOrNullAsync(eventData.Name); if (notification == null) { return; } var culture = eventData.Data.TryGetData(NotificationData.CultureKey)?.ToString(); if (culture.IsNullOrWhiteSpace()) { var cultureSet = await SettingProvider.GetOrNullAsync(LocalizationSettingNames.DefaultLanguage); culture = cultureSet ?? CultureInfo.CurrentCulture.Name; } using (CultureHelper.Use(culture, culture)) { if (notification.NotificationType == NotificationType.System) { using (CurrentTenant.Change(null)) { await SendToTenantAsync(null, notification, eventData); var allActiveTenants = await TenantConfigurationCache.GetTenantsAsync(); foreach (var activeTenant in allActiveTenants) { await SendToTenantAsync(activeTenant.Id, notification, eventData); } } } else { await SendToTenantAsync(eventData.TenantId, notification, eventData); } } } protected async virtual Task SendToTenantAsync( Guid? tenantId, NotificationDefinition notification, NotificationEto eventData, NotificationTemplateResolveResult templateResolveResult) { using (CurrentTenant.Change(tenantId)) { var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers); // 过滤用户指定提供者 if (eventData.UseProviders.Any()) { providers = providers.Where(p => eventData.UseProviders.Contains(p.Name)); } else if (notification.Providers.Any()) { providers = providers.Where(p => notification.Providers.Contains(p.Name)); } var notificationInfo = new NotificationInfo { Name = notification.Name, TenantId = tenantId, Severity = eventData.Severity, Type = notification.NotificationType, ContentType = notification.ContentType, CreationTime = eventData.CreationTime, Lifetime = notification.NotificationLifetime, }; notificationInfo.SetId(eventData.Id); var title = notification.DisplayName.Localize(StringLocalizerFactory); var message = ""; try { // 由于模板通知受租户影响, 格式化失败的消息将被丢弃. message = await TemplateRenderer.RenderAsync( templateName: eventData.Data.Name, // 解决序列化后的数据无法渲染模板 model: templateResolveResult.Model ?? eventData.Data.ExtraProperties, cultureName: eventData.Data.Culture, globalContext: new Dictionary { // 模板不支持 $ 字符, 改为普通关键字 { NotificationKeywords.Name, notification.Name }, { NotificationKeywords.FormUser, eventData.Data.FormUser }, { NotificationKeywords.Id, eventData.Id }, { NotificationKeywords.Title, title.ToString() }, { NotificationKeywords.CreationTime, eventData.CreationTime.ToString(Options.DateTimeFormat) }, }); } catch (Exception ex) { Logger.LogWarning("Formatting template notification failed, message will be discarded, cause :{message}", ex.Message); return; } var notificationData = new NotificationData(); notificationData.WriteStandardData( title: title.ToString(), message: message, createTime: eventData.CreationTime, formUser: eventData.Data.FormUser); notificationData.ExtraProperties.AddIfNotContains(eventData.Data.ExtraProperties); notificationInfo.Data = notificationData; var subscriptionUsers = await GerSubscriptionUsersAsync( notificationInfo.Name, eventData.Users, tenantId); await PersistentNotificationAsync( notificationInfo, subscriptionUsers, providers); if (subscriptionUsers.Any()) { // 发布通知 foreach (var provider in providers) { await PublishToSubscriberAsync(provider, notificationInfo, subscriptionUsers); } } } } protected async virtual Task SendToTenantAsync( Guid? tenantId, NotificationDefinition notification, NotificationEto eventData) { using (CurrentTenant.Change(tenantId)) { var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers); // 过滤用户指定提供者 if (eventData.UseProviders.Any()) { providers = providers.Where(p => eventData.UseProviders.Contains(p.Name)); } else if (notification.Providers.Any()) { providers = providers.Where(p => notification.Providers.Contains(p.Name)); } var notificationInfo = new NotificationInfo { Name = notification.Name, CreationTime = eventData.CreationTime, Data = eventData.Data, Severity = eventData.Severity, Lifetime = notification.NotificationLifetime, TenantId = tenantId, Type = notification.NotificationType, ContentType = notification.ContentType, }; notificationInfo.SetId(eventData.Id); notificationInfo.Data = NotificationDataSerializer.Serialize(notificationInfo.Data); // 获取用户订阅 var subscriptionUsers = await GerSubscriptionUsersAsync( notificationInfo.Name, eventData.Users, tenantId); // 持久化通知 await PersistentNotificationAsync( notificationInfo, subscriptionUsers, providers); if (subscriptionUsers.Any()) { // 发布订阅通知 foreach (var provider in providers) { await PublishToSubscriberAsync(provider, notificationInfo, subscriptionUsers); } } } } /// /// 获取用户订阅列表 /// /// 通知名称 /// 接收用户列表 /// 租户标识 /// 用户订阅列表 protected async Task> GerSubscriptionUsersAsync( string notificationName, IEnumerable sendToUsers, Guid? tenantId = null) { try { // 获取用户订阅列表 var userSubscriptions = await NotificationSubscriptionManager.GetUsersSubscriptionsAsync( tenantId, notificationName, sendToUsers); return userSubscriptions.Select(us => new UserIdentifier(us.UserId, us.UserName)); } catch (Exception ex) { Logger.LogWarning("Failed to get user subscription, message will not be received by the user, reason: {message}", ex.Message); } return new List(); } /// /// 持久化通知并返回订阅用户列表 /// /// 通知实体 /// 订阅用户列表 /// 通知发送提供者 /// 返回订阅者列表 protected async Task PersistentNotificationAsync( NotificationInfo notificationInfo, IEnumerable subscriptionUsers, IEnumerable sendToProviders) { try { // 持久化通知 await NotificationStore.InsertNotificationAsync(notificationInfo); if (!subscriptionUsers.Any()) { return; } // 持久化用户通知 await NotificationStore.InsertUserNotificationsAsync(notificationInfo, subscriptionUsers.Select(u => u.UserId)); if (notificationInfo.Lifetime == NotificationLifetime.OnlyOne) { // 一次性通知取消用户订阅 await NotificationStore.DeleteUserSubscriptionAsync( notificationInfo.TenantId, subscriptionUsers, notificationInfo.Name); } } catch (Exception ex) { Logger.LogWarning("Failed to persistent notification failed, reason: {message}", ex.Message); foreach (var provider in sendToProviders) { // 处理持久化失败进入后台队列 await ProcessingFailedToQueueAsync(provider, notificationInfo, subscriptionUsers); } } } /// /// 发布订阅者通知 /// /// 通知发布者 /// 通知信息 /// 订阅用户列表 /// protected async Task PublishToSubscriberAsync( INotificationPublishProvider provider, NotificationInfo notificationInfo, IEnumerable subscriptionUsers) { var sendInfo = OnPublishing(provider, notificationInfo, subscriptionUsers); try { Logger.LogDebug($"Sending notification with provider {provider.Name}"); if (await provider.CanPublishAsync(notificationInfo)) { var context = new NotificationPublishContext(notificationInfo, subscriptionUsers); // 发布 await provider.PublishAsync(context); sendInfo.Sent(context.Exception); if (context.Exception == null && !context.Reason.IsNullOrWhiteSpace()) { sendInfo.Cancel(context.Reason); } Logger.LogDebug($"Send notification {notificationInfo.Name} with provider {provider.Name} was successful"); } else { sendInfo.Disbaled(); } await OnPublished(sendInfo); } catch (Exception ex) { Logger.LogWarning($"Send notification error with provider {provider.Name}"); Logger.LogWarning($"Error message:{ex.Message}"); Logger.LogDebug($"Failed to send notification {notificationInfo.Name}. Try to push notification to background job"); // 发送失败的消息进入后台队列 await ProcessingFailedToQueueAsync(provider, notificationInfo, subscriptionUsers); try { sendInfo.Sent(ex); await OnPublished(sendInfo); } catch { } } } /// /// 处理失败的消息进入后台队列 /// /// /// 注: 如果入队失败,消息将被丢弃. /// /// /// /// /// protected async Task ProcessingFailedToQueueAsync( INotificationPublishProvider provider, NotificationInfo notificationInfo, IEnumerable subscriptionUsers) { try { // 发送失败的消息进入后台队列 await BackgroundJobManager.EnqueueAsync( new NotificationPublishJobArgs( notificationInfo.GetId(), provider.GetType().AssemblyQualifiedName, subscriptionUsers.ToList(), notificationInfo.TenantId)); } catch (Exception ex) { Logger.LogWarning("Failed to push to background job, notification will be discarded, error cause: {message}", ex.Message); } } protected virtual NotificationSendInfo OnPublishing( INotificationPublishProvider provider, NotificationInfo notification, IEnumerable identifiers) { return new NotificationSendInfo( provider.Name, Clock.Now, notification, identifiers); } protected async Task OnPublished(NotificationSendInfo sendInfo) { await NotificationStore.InsertSendStateAsync(sendInfo); } } }